You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/11/24 18:17:58 UTC
[15/16] flink git commit: [FLINK-2901] Remove Record API dependencies
from flink-tests #1
http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
deleted file mode 100644
index 3ce021b..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.api.java.record.io.TextInputFormat;
-import org.apache.flink.api.java.record.operators.BulkIteration;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-
-@SuppressWarnings("deprecation")
-public class IterationTerminationWithTwoTails extends RecordAPITestBase {
-
- private static final String INPUT = "1\n" + "2\n" + "3\n" + "4\n" + "5\n";
- private static final String EXPECTED = "22\n";
-
- protected String dataPath;
- protected String resultPath;
-
- public IterationTerminationWithTwoTails(){
- setTaskManagerNumSlots(parallelism);
- }
-
- @Override
- protected void preSubmit() throws Exception {
- dataPath = createTempFile("datapoints.txt", INPUT);
- resultPath = getTempFilePath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(EXPECTED, resultPath);
- }
-
- @Override
- protected Plan getTestJob() {
- return getTestPlanPlan(parallelism, dataPath, resultPath);
- }
-
- private static Plan getTestPlanPlan(int numSubTasks, String input, String output) {
-
- FileDataSource initialInput = new FileDataSource(TextInputFormat.class, input, "input");
-
- BulkIteration iteration = new BulkIteration("Loop");
- iteration.setInput(initialInput);
- iteration.setMaximumNumberOfIterations(5);
- Assert.assertTrue(iteration.getMaximumNumberOfIterations() > 1);
-
- ReduceOperator sumReduce = ReduceOperator.builder(new SumReducer())
- .input(iteration.getPartialSolution())
- .name("Compute sum (Reduce)")
- .build();
-
- iteration.setNextPartialSolution(sumReduce);
-
- MapOperator terminationMapper = MapOperator.builder(new TerminationMapper())
- .input(iteration.getPartialSolution())
- .name("Compute termination criterion (Map)")
- .build();
-
- iteration.setTerminationCriterion(terminationMapper);
-
- FileDataSink finalResult = new FileDataSink(CsvOutputFormat.class, output, iteration, "Output");
- CsvOutputFormat.configureRecordFormat(finalResult)
- .recordDelimiter('\n')
- .fieldDelimiter(' ')
- .field(StringValue.class, 0);
-
- Plan plan = new Plan(finalResult, "Iteration with AllReducer (keyless Reducer)");
- plan.setDefaultParallelism(4);
- Assert.assertTrue(plan.getDefaultParallelism() > 1);
- return plan;
- }
-
- static final class SumReducer extends ReduceFunction implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void reduce(Iterator<Record> it, Collector<Record> out) {
- // Compute the sum
- int sum = 0;
-
- while (it.hasNext()) {
- sum += Integer.parseInt(it.next().getField(0, StringValue.class).getValue()) + 1;
- }
-
- out.collect(new Record(new StringValue(Integer.toString(sum))));
- }
- }
-
- public static class TerminationMapper extends MapFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void map(Record record, Collector<Record> collector) {
-
- int currentSum = Integer.parseInt(record.getField(0, StringValue.class).getValue());
-
- if(currentSum < 21)
- collector.collect(record);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
index cb16c15..ab66f31 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
@@ -18,91 +18,34 @@
package org.apache.flink.test.iterative;
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.api.java.record.io.TextInputFormat;
-import org.apache.flink.api.java.record.operators.BulkIteration;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-
-@SuppressWarnings("deprecation")
-public class IterationWithAllReducerITCase extends RecordAPITestBase {
-
- private static final String INPUT = "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n";
+import java.util.List;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class IterationWithAllReducerITCase extends JavaProgramTestBase {
private static final String EXPECTED = "1\n";
- protected String dataPath;
- protected String resultPath;
-
- public IterationWithAllReducerITCase(){
- setTaskManagerNumSlots(4);
- }
-
@Override
- protected void preSubmit() throws Exception {
- dataPath = createTempFile("datapoints.txt", INPUT);
- resultPath = getTempFilePath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(EXPECTED, resultPath);
- }
-
- @Override
- protected Plan getTestJob() {
- Plan plan = getTestPlanPlan(parallelism, dataPath, resultPath);
- return plan;
- }
+ protected void testProgram() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(4);
-
- private static Plan getTestPlanPlan(int numSubTasks, String input, String output) {
+ DataSet<String> initialInput = env.fromElements("1", "1", "1", "1", "1", "1", "1", "1");
- FileDataSource initialInput = new FileDataSource(TextInputFormat.class, input, "input");
-
- BulkIteration iteration = new BulkIteration("Loop");
- iteration.setInput(initialInput);
- iteration.setMaximumNumberOfIterations(5);
-
- Assert.assertTrue(iteration.getMaximumNumberOfIterations() > 1);
+ IterativeDataSet<String> iteration = initialInput.iterate(5).name("Loop");
- ReduceOperator sumReduce = ReduceOperator.builder(new PickOneReducer())
- .input(iteration.getPartialSolution())
- .name("Compute sum (Reduce)")
- .build();
-
- iteration.setNextPartialSolution(sumReduce);
+ DataSet<String> sumReduce = iteration.reduce(new ReduceFunction<String>(){
+ @Override
+ public String reduce(String value1, String value2) throws Exception {
+ return value1;
+ }
+ }).name("Compute sum (Reduce)");
- FileDataSink finalResult = new FileDataSink(CsvOutputFormat.class, output, iteration, "Output");
- CsvOutputFormat.configureRecordFormat(finalResult)
- .recordDelimiter('\n')
- .fieldDelimiter(' ')
- .field(StringValue.class, 0);
+ List<String> result = iteration.closeWith(sumReduce).collect();
- Plan plan = new Plan(finalResult, "Iteration with AllReducer (keyless Reducer)");
-
- plan.setDefaultParallelism(numSubTasks);
- Assert.assertTrue(plan.getDefaultParallelism() > 1);
-
- return plan;
- }
-
- public static final class PickOneReducer extends ReduceFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void reduce(Iterator<Record> it, Collector<Record> out) {
- out.collect(it.next());
- }
+ compareResultAsText(result, EXPECTED);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java
index c11c9ea..c283df1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java
@@ -18,43 +18,25 @@
package org.apache.flink.test.iterative;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.operators.BulkIteration;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.recordJobs.kmeans.udfs.PointInFormat;
-import org.apache.flink.test.recordJobs.kmeans.udfs.PointOutFormat;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.PointFormatter;
+import org.apache.flink.test.util.PointInFormat;
+import org.apache.flink.test.util.CoordVector;
+import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.util.Collector;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-@SuppressWarnings("deprecation")
-@RunWith(Parameterized.class)
-public class IterationWithChainingITCase extends RecordAPITestBase {
+public class IterationWithChainingITCase extends JavaProgramTestBase {
private static final String DATA_POINTS = "0|50.90|16.20|72.08|\n" + "1|73.65|61.76|62.89|\n" + "2|61.73|49.95|92.74|\n";
private String dataPath;
private String resultPath;
- public IterationWithChainingITCase(Configuration config) {
- super(config);
- setTaskManagerNumSlots(parallelism);
- }
-
@Override
protected void preSubmit() throws Exception {
dataPath = createTempFile("data_points.txt", DATA_POINTS);
@@ -62,63 +44,35 @@ public class IterationWithChainingITCase extends RecordAPITestBase {
}
@Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(DATA_POINTS, resultPath);
- }
-
- @Override
- protected Plan getTestJob() {
- return getTestPlan(config.getInteger("ChainedMapperITCase#NoSubtasks", 1), dataPath, resultPath);
- }
+ protected void testProgram() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(4);
- @Parameters
- public static Collection<Object[]> getConfigurations() {
- Configuration config1 = new Configuration();
- config1.setInteger("ChainedMapperITCase#NoSubtasks", parallelism);
- return toParameterList(config1);
- }
+ DataSet<Tuple2<Integer, CoordVector>> initialInput
+ = env.readFile(new PointInFormat(), dataPath).setParallelism(1).name("Input");
- public static final class IdentityMapper extends MapFunction implements Serializable {
+ IterativeDataSet<Tuple2<Integer, CoordVector>> iteration = initialInput.iterate(2).name("Loop");
- private static final long serialVersionUID = 1L;
+ DataSet<Tuple2<Integer, CoordVector>> identity
+ = iteration.groupBy(0).reduceGroup(new GroupReduceFunction<Tuple2<Integer, CoordVector>, Tuple2<Integer, CoordVector>>() {
+ @Override
+ public void reduce(Iterable<Tuple2<Integer, CoordVector>> values, Collector<Tuple2<Integer, CoordVector>> out) throws Exception {
+ for (Tuple2<Integer, CoordVector> value : values) {
+ out.collect(value);
+ }
+ }
+ }).map(new MapFunction<Tuple2<Integer, CoordVector>, Tuple2<Integer, CoordVector>>() {
+ @Override
+ public Tuple2<Integer, CoordVector> map(Tuple2<Integer, CoordVector> value) throws Exception {
+ return value;
+ }
- @Override
- public void map(Record rec, Collector<Record> out) {
- out.collect(rec);
- }
- }
+ });
- public static final class DummyReducer extends ReduceFunction implements Serializable {
+ iteration.closeWith(identity).writeAsFormattedText(resultPath, new PointFormatter());
- private static final long serialVersionUID = 1L;
+ env.execute("Iteration with chained map test");
- @Override
- public void reduce(Iterator<Record> it, Collector<Record> out) {
- while (it.hasNext()) {
- out.collect(it.next());
- }
- }
- }
-
- static Plan getTestPlan(int numSubTasks, String input, String output) {
-
- FileDataSource initialInput = new FileDataSource(new PointInFormat(), input, "Input");
- initialInput.setParallelism(1);
-
- BulkIteration iteration = new BulkIteration("Loop");
- iteration.setInput(initialInput);
- iteration.setMaximumNumberOfIterations(2);
-
- ReduceOperator dummyReduce = ReduceOperator.builder(new DummyReducer(), IntValue.class, 0).input(iteration.getPartialSolution())
- .name("Reduce something").build();
-
- MapOperator dummyMap = MapOperator.builder(new IdentityMapper()).input(dummyReduce).build();
- iteration.setNextPartialSolution(dummyMap);
-
- FileDataSink finalResult = new FileDataSink(new PointOutFormat(), output, iteration, "Output");
-
- Plan plan = new Plan(finalResult, "Iteration with chained map test");
- plan.setDefaultParallelism(numSubTasks);
- return plan;
+ compareResultsByLinesInMemory(DATA_POINTS, resultPath);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java
index 2a4a4b7..8756429 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java
@@ -25,10 +25,11 @@ import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.IterativeDataSet;
-import org.apache.flink.test.recordJobs.kmeans.udfs.PointInFormat;
-import org.apache.flink.test.recordJobs.kmeans.udfs.PointOutFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.CoordVector;
+import org.apache.flink.test.util.PointFormatter;
+import org.apache.flink.test.util.PointInFormat;
import org.apache.flink.test.util.JavaProgramTestBase;
-import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
public class IterationWithUnionITCase extends JavaProgramTestBase {
@@ -54,32 +55,32 @@ public class IterationWithUnionITCase extends JavaProgramTestBase {
protected void testProgram() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- DataSet<Record> initialInput = env.readFile(new PointInFormat(), this.dataPath).setParallelism(1);
+ DataSet<Tuple2<Integer, CoordVector>> initialInput = env.readFile(new PointInFormat(), this.dataPath).setParallelism(1);
- IterativeDataSet<Record> iteration = initialInput.iterate(2);
+ IterativeDataSet<Tuple2<Integer, CoordVector>> iteration = initialInput.iterate(2);
- DataSet<Record> result = iteration.union(iteration).map(new IdentityMapper());
+ DataSet<Tuple2<Integer, CoordVector>> result = iteration.union(iteration).map(new IdentityMapper());
- iteration.closeWith(result).write(new PointOutFormat(), this.resultPath);
+ iteration.closeWith(result).writeAsFormattedText(this.resultPath, new PointFormatter());
env.execute();
}
- static final class IdentityMapper implements MapFunction<Record, Record>, Serializable {
+ static final class IdentityMapper implements MapFunction<Tuple2<Integer, CoordVector>, Tuple2<Integer, CoordVector>>, Serializable {
private static final long serialVersionUID = 1L;
@Override
- public Record map(Record rec) {
+ public Tuple2<Integer, CoordVector> map(Tuple2<Integer, CoordVector> rec) {
return rec;
}
}
- static class DummyReducer implements GroupReduceFunction<Record, Record>, Serializable {
+ static class DummyReducer implements GroupReduceFunction<Tuple2<Integer, CoordVector>, Tuple2<Integer, CoordVector>>, Serializable {
private static final long serialVersionUID = 1L;
@Override
- public void reduce(Iterable<Record> it, Collector<Record> out) {
- for (Record r : it) {
+ public void reduce(Iterable<Tuple2<Integer, CoordVector>> it, Collector<Tuple2<Integer, CoordVector>> out) {
+ for (Tuple2<Integer, CoordVector> r : it) {
out.collect(r);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/iterative/IterativeKMeansITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterativeKMeansITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterativeKMeansITCase.java
deleted file mode 100644
index ac3659a..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterativeKMeansITCase.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.test.recordJobs.kmeans.KMeansBroadcast;
-import org.apache.flink.test.testdata.KMeansData;
-import org.apache.flink.test.util.RecordAPITestBase;
-
-
-public class IterativeKMeansITCase extends RecordAPITestBase {
-
- protected String dataPath;
- protected String clusterPath;
- protected String resultPath;
-
- public IterativeKMeansITCase(){
- setTaskManagerNumSlots(parallelism);
- }
-
- @Override
- protected void preSubmit() throws Exception {
- dataPath = createTempFile("datapoints.txt", KMeansData.DATAPOINTS);
- clusterPath = createTempFile("initial_centers.txt", KMeansData.INITIAL_CENTERS);
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected Plan getTestJob() {
- KMeansBroadcast kmi = new KMeansBroadcast();
- return kmi.getPlan(String.valueOf(parallelism), dataPath, clusterPath, resultPath, "20");
- }
-
-
- @Override
- protected void postSubmit() throws Exception {
- List<String> resultLines = new ArrayList<String>();
- readAllResultLines(resultLines, resultPath);
-
- KMeansData.checkResultsWithDelta(KMeansData.CENTERS_AFTER_20_ITERATIONS_SINGLE_DIGIT, resultLines, 0.1);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/iterative/KMeansITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/KMeansITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/KMeansITCase.java
deleted file mode 100644
index fcf43df..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/KMeansITCase.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.test.recordJobs.kmeans.KMeansBroadcast;
-import org.apache.flink.test.testdata.KMeansData;
-import org.apache.flink.test.util.RecordAPITestBase;
-
-
-public class KMeansITCase extends RecordAPITestBase {
-
- protected String dataPath;
- protected String clusterPath;
- protected String resultPath;
-
- public KMeansITCase(){
- setTaskManagerNumSlots(parallelism);
- }
-
- @Override
- protected void preSubmit() throws Exception {
- dataPath = createTempFile("datapoints.txt", KMeansData.DATAPOINTS);
- clusterPath = createTempFile("initial_centers.txt", KMeansData.INITIAL_CENTERS);
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected Plan getTestJob() {
- KMeansBroadcast kmi = new KMeansBroadcast();
- return kmi.getPlan(String.valueOf(parallelism), dataPath, clusterPath, resultPath, "20");
- }
-
-
- @Override
- protected void postSubmit() throws Exception {
- List<String> resultLines = new ArrayList<String>();
- readAllResultLines(resultLines, resultPath);
-
- KMeansData.checkResultsWithDelta(KMeansData.CENTERS_AFTER_20_ITERATIONS_SINGLE_DIGIT, resultLines, 0.1);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java
index ab8ff45..959a17a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java
@@ -21,11 +21,24 @@ package org.apache.flink.test.optimizer.examples;
import static org.junit.Assert.*;
import java.util.Arrays;
+import java.util.Collection;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.operators.GenericDataSourceBase;
import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.record.operators.FileDataSource;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.client.program.OptimizerPlanEnvironment;
+import org.apache.flink.client.program.PreviewPlanEnvironment;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
@@ -34,11 +47,10 @@ import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.util.LocalStrategy;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.optimizer.util.OperatorResolver;
-import org.apache.flink.test.recordJobs.kmeans.KMeansSingleStep;
+import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;
-@SuppressWarnings("deprecation")
public class KMeansSingleStepTest extends CompilerTestBase {
private static final String DATAPOINTS = "Data Points";
@@ -54,16 +66,15 @@ public class KMeansSingleStepTest extends CompilerTestBase {
@Test
public void testCompileKMeansSingleStepWithStats() {
-
- KMeansSingleStep kmi = new KMeansSingleStep();
- Plan p = kmi.getPlan(String.valueOf(DEFAULT_PARALLELISM), IN_FILE, IN_FILE, OUT_FILE, String.valueOf(20));
+
+ Plan p = getKMeansPlan();
p.setExecutionConfig(new ExecutionConfig());
// set the statistics
OperatorResolver cr = getContractResolver(p);
- FileDataSource pointsSource = cr.getNode(DATAPOINTS);
- FileDataSource centersSource = cr.getNode(CENTERS);
- setSourceStatistics(pointsSource, 100l*1024*1024*1024, 32f);
- setSourceStatistics(centersSource, 1024*1024, 32f);
+ GenericDataSourceBase pointsSource = cr.getNode(DATAPOINTS);
+ GenericDataSourceBase centersSource = cr.getNode(CENTERS);
+ setSourceStatistics(pointsSource, 100l * 1024 * 1024 * 1024, 32f);
+ setSourceStatistics(centersSource, 1024 * 1024, 32f);
OptimizedPlan plan = compileWithStats(p);
checkPlan(plan);
@@ -71,9 +82,8 @@ public class KMeansSingleStepTest extends CompilerTestBase {
@Test
public void testCompileKMeansSingleStepWithOutStats() {
-
- KMeansSingleStep kmi = new KMeansSingleStep();
- Plan p = kmi.getPlan(String.valueOf(DEFAULT_PARALLELISM), IN_FILE, IN_FILE, OUT_FILE, String.valueOf(20));
+
+ Plan p = getKMeansPlan();
p.setExecutionConfig(new ExecutionConfig());
OptimizedPlan plan = compileNoStats(p);
checkPlan(plan);
@@ -97,7 +107,7 @@ public class KMeansSingleStepTest extends CompilerTestBase {
assertEquals(LocalStrategy.NONE, mapper.getInput().getLocalStrategy());
assertEquals(LocalStrategy.NONE, mapper.getBroadcastInputs().get(0).getLocalStrategy());
- assertEquals(DriverStrategy.COLLECTOR_MAP, mapper.getDriverStrategy());
+ assertEquals(DriverStrategy.MAP, mapper.getDriverStrategy());
assertNull(mapper.getInput().getLocalStrategyKeys());
assertNull(mapper.getInput().getLocalStrategySortOrder());
@@ -127,4 +137,145 @@ public class KMeansSingleStepTest extends CompilerTestBase {
assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
assertEquals(LocalStrategy.NONE, sink.getInput().getLocalStrategy());
}
+
+ public static Plan getKMeansPlan() {
+ // prepare the test environment
+ PreviewPlanEnvironment env = new PreviewPlanEnvironment();
+ env.setAsContext();
+ try {
+ KMeans(new String[]{IN_FILE, IN_FILE, OUT_FILE, "20"});
+ } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
+ // all good.
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("KMeans failed with an exception");
+ }
+ return env.getPlan();
+ }
+
+ public static void KMeans(String[] args) throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Point> points = env.readCsvFile(args[0])
+ .fieldDelimiter(" ")
+ .includeFields(true, true)
+ .types(Double.class, Double.class)
+ .name(DATAPOINTS)
+ .map(new MapFunction<Tuple2<Double, Double>, Point>() {
+ @Override
+ public Point map(Tuple2<Double, Double> value) throws Exception {
+ return new Point(value.f0, value.f1);
+ }
+ });
+
+ DataSet<Centroid> centroids = env.readCsvFile(args[1])
+ .fieldDelimiter(" ")
+ .includeFields(true, true, true)
+ .types(Integer.class, Double.class, Double.class)
+ .name(CENTERS)
+ .map(new MapFunction<Tuple3<Integer, Double, Double>, Centroid>() {
+ @Override
+ public Centroid map(Tuple3<Integer, Double, Double> value) throws Exception {
+ return new Centroid(value.f0, value.f1, value.f2);
+ }
+ });
+
+ DataSet<Tuple3<Integer, Point, Integer>> newCentroids = points
+ .map(new SelectNearestCenter()).name(MAPPER_NAME).withBroadcastSet(centroids, "centroids");
+
+ DataSet<Tuple3<Integer, Point, Integer>> recomputeClusterCenter
+ = newCentroids.groupBy(0).reduceGroup(new RecomputeClusterCenter()).name(REDUCER_NAME);
+
+ recomputeClusterCenter.project(0, 1).writeAsCsv(args[2], "\n", " ").name(SINK);
+
+ env.execute("KMeans Example");
+ }
+
+ public static class Point extends Tuple2<Double, Double> {
+ public Point(double x, double y) {
+ this.f0 = x;
+ this.f1 = y;
+ }
+
+ public Point add(Point other) {
+ f0 += other.f0;
+ f1 += other.f1;
+ return this;
+ }
+
+ public Point div(long val) {
+ f0 /= val;
+ f1 /= val;
+ return this;
+ }
+
+ public double euclideanDistance(Point other) {
+ return Math.sqrt((f0 - other.f0) * (f0 - other.f0) + (f1 - other.f1) * (f1 - other.f1));
+ }
+
+ public double euclideanDistance(Centroid other) {
+ return Math.sqrt((f0 - other.f1.f0) * (f0 - other.f1.f0) + (f1 - other.f1.f1) * (f1 - other.f1.f1));
+ }
+ }
+
+ public static class Centroid extends Tuple2<Integer, Point> {
+ public Centroid(int id, double x, double y) {
+ this.f0 = id;
+ this.f1 = new Point(x, y);
+ }
+
+ public Centroid(int id, Point p) {
+ this.f0 = id;
+ this.f1 = p;
+ }
+ }
+
+ /**
+ * Determines the closest cluster center for a data point.
+ */
+ public static final class SelectNearestCenter extends RichMapFunction<Point, Tuple3<Integer, Point, Integer>> {
+ private Collection<Centroid> centroids;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ this.centroids = getRuntimeContext().getBroadcastVariable("centroids");
+ }
+
+ @Override
+ public Tuple3<Integer, Point, Integer> map(Point p) throws Exception {
+ double minDistance = Double.MAX_VALUE;
+ int closestCentroidId = -1;
+ for (Centroid centroid : centroids) {
+ double distance = p.euclideanDistance(centroid);
+ if (distance < minDistance) {
+ minDistance = distance;
+ closestCentroidId = centroid.f0;
+ }
+ }
+ return new Tuple3<>(closestCentroidId, p, 1);
+ }
+ }
+
+ @Combinable
+ public static final class RecomputeClusterCenter extends RichGroupReduceFunction<Tuple3<Integer, Point, Integer>, Tuple3<Integer, Point, Integer>> {
+ @Override
+ public void reduce(Iterable<Tuple3<Integer, Point, Integer>> values, Collector<Tuple3<Integer, Point, Integer>> out) throws Exception {
+ int id = -1;
+ double x = 0;
+ double y = 0;
+ int count = 0;
+ for (Tuple3<Integer, Point, Integer> value : values) {
+ id = value.f0;
+ x += value.f1.f0;
+ y += value.f1.f1;
+ count += value.f2;
+ }
+ out.collect(new Tuple3<>(id, new Point(x, y), count));
+ }
+
+ @Override
+ public void combine(Iterable<Tuple3<Integer, Point, Integer>> values, Collector<Tuple3<Integer, Point, Integer>> out) throws Exception {
+ reduce(values, out);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java
index f4efb8a..e929913 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java
@@ -22,10 +22,23 @@ import java.util.Arrays;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
+import org.apache.flink.api.common.operators.DualInputOperator;
+import org.apache.flink.api.common.operators.GenericDataSourceBase;
+import org.apache.flink.api.common.operators.SingleInputOperator;
import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.MapOperator;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.client.program.OptimizerPlanEnvironment;
+import org.apache.flink.client.program.PreviewPlanEnvironment;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
@@ -35,23 +48,25 @@ import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.util.LocalStrategy;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.optimizer.util.OperatorResolver;
-import org.apache.flink.test.recordJobs.relational.TPCHQuery3;
+import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;
/**
* Tests TPCH Q3 (simplified) under various input conditions.
*/
-@SuppressWarnings("deprecation")
+@SuppressWarnings("serial")
public class RelationalQueryCompilerTest extends CompilerTestBase {
private static final String ORDERS = "Orders";
private static final String LINEITEM = "LineItems";
private static final String MAPPER_NAME = "FilterO";
private static final String JOIN_NAME = "JoinLiO";
+ private static final String REDUCE_NAME = "AggLiO";
+ private static final String SINK = "Output";
private final FieldList set0 = new FieldList(0);
- private final FieldList set01 = new FieldList(new int[] {0,1});
+ private final FieldList set01 = new FieldList(0,1);
private final ExecutionConfig defaultExecutionConfig = new ExecutionConfig();
// ------------------------------------------------------------------------
@@ -63,8 +78,7 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
@Test
public void testQueryNoStatistics() {
try {
- TPCHQuery3 query = new TPCHQuery3();
- Plan p = query.getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE);
+ Plan p = getTPCH3Plan();
p.setExecutionConfig(defaultExecutionConfig);
// compile
final OptimizedPlan plan = compileNoStats(p);
@@ -72,12 +86,12 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
final OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(plan);
// get the nodes from the final plan
- final SinkPlanNode sink = or.getNode("Output");
- final SingleInputPlanNode reducer = or.getNode("AggLio");
+ final SinkPlanNode sink = or.getNode(SINK);
+ final SingleInputPlanNode reducer = or.getNode(REDUCE_NAME);
final SingleInputPlanNode combiner = reducer.getPredecessor() instanceof SingleInputPlanNode ?
(SingleInputPlanNode) reducer.getPredecessor() : null;
- final DualInputPlanNode join = or.getNode("JoinLiO");
- final SingleInputPlanNode filteringMapper = or.getNode("FilterO");
+ final DualInputPlanNode join = or.getNode(JOIN_NAME);
+ final SingleInputPlanNode filteringMapper = or.getNode(MAPPER_NAME);
// verify the optimizer choices
checkStandardStrategies(filteringMapper, join, combiner, reducer, sink);
@@ -95,7 +109,7 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
*/
@Test
public void testQueryAnyValidPlan() {
- testQueryGeneric(1024*1024*1024L, 8*1024*1024*1024L, 0.05f, true, true, true, false, true);
+ testQueryGeneric(1024*1024*1024L, 8*1024*1024*1024L, 0.05f, 0.05f, true, true, true, false, true);
}
/**
@@ -103,7 +117,7 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
*/
@Test
public void testQueryWithSizeZeroInputs() {
- testQueryGeneric(0, 0, 0.5f, true, true, true, false, true);
+ testQueryGeneric(0, 0, 0.1f, 0.5f, true, true, true, false, true);
}
/**
@@ -111,7 +125,7 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
*/
@Test
public void testQueryWithStatsForBroadcastHash() {
- testQueryGeneric(1024l*1024*1024*1024, 1024l*1024*1024*1024, 0.05f, true, false, true, false, false);
+ testQueryGeneric(1024l*1024*1024*1024, 1024l*1024*1024*1024, 0.01f, 0.05f, true, false, true, false, false);
}
/**
@@ -119,7 +133,7 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
*/
@Test
public void testQueryWithStatsForRepartitionAny() {
- testQueryGeneric(100l*1024*1024*1024*1024, 100l*1024*1024*1024*1024, 0.5f, false, true, true, true, true);
+ testQueryGeneric(100l*1024*1024*1024*1024, 100l*1024*1024*1024*1024, 0.1f, 0.5f, false, true, true, true, true);
}
/**
@@ -128,34 +142,23 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
*/
@Test
public void testQueryWithStatsForRepartitionMerge() {
- TPCHQuery3 query = new TPCHQuery3();
- Plan p = query.getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE);
+ Plan p = getTPCH3Plan();
p.setExecutionConfig(defaultExecutionConfig);
// set compiler hints
OperatorResolver cr = getContractResolver(p);
- JoinOperator match = cr.getNode("JoinLiO");
+ DualInputOperator<?,?,?,?> match = cr.getNode(JOIN_NAME);
match.getCompilerHints().setFilterFactor(100f);
- testQueryGeneric(100l*1024*1024*1024*1024, 100l*1024*1024*1024*1024, 0.05f, 100f, false, true, false, false, true);
+ testQueryGeneric(100l*1024*1024*1024*1024, 100l*1024*1024*1024*1024, 0.01f, 100f, false, true, false, false, true);
}
// ------------------------------------------------------------------------
-
- private void testQueryGeneric(long orderSize, long lineItemSize,
- float ordersFilterFactor,
- boolean broadcastOkay, boolean partitionedOkay,
- boolean hashJoinFirstOkay, boolean hashJoinSecondOkay, boolean mergeJoinOkay)
- {
- testQueryGeneric(orderSize, lineItemSize, ordersFilterFactor, ordersFilterFactor, broadcastOkay, partitionedOkay, hashJoinFirstOkay, hashJoinSecondOkay, mergeJoinOkay);
- }
-
private void testQueryGeneric(long orderSize, long lineItemSize,
float ordersFilterFactor, float joinFilterFactor,
boolean broadcastOkay, boolean partitionedOkay,
boolean hashJoinFirstOkay, boolean hashJoinSecondOkay, boolean mergeJoinOkay)
{
- TPCHQuery3 query = new TPCHQuery3();
- Plan p = query.getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE);
+ Plan p = getTPCH3Plan();
p.setExecutionConfig(defaultExecutionConfig);
testQueryGeneric(p, orderSize, lineItemSize, ordersFilterFactor, joinFilterFactor, broadcastOkay, partitionedOkay, hashJoinFirstOkay, hashJoinSecondOkay, mergeJoinOkay);
}
@@ -168,10 +171,10 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
try {
// set statistics
OperatorResolver cr = getContractResolver(p);
- FileDataSource ordersSource = cr.getNode(ORDERS);
- FileDataSource lineItemSource = cr.getNode(LINEITEM);
- MapOperator mapper = cr.getNode(MAPPER_NAME);
- JoinOperator joiner = cr.getNode(JOIN_NAME);
+ GenericDataSourceBase<?,?> ordersSource = cr.getNode(ORDERS);
+ GenericDataSourceBase<?,?> lineItemSource = cr.getNode(LINEITEM);
+ SingleInputOperator<?,?,?> mapper = cr.getNode(MAPPER_NAME);
+ DualInputOperator<?,?,?,?> joiner = cr.getNode(JOIN_NAME);
setSourceStatistics(ordersSource, orderSize, 100f);
setSourceStatistics(lineItemSource, lineitemSize, 140f);
mapper.getCompilerHints().setAvgOutputRecordSize(16f);
@@ -183,12 +186,12 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
final OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(plan);
// get the nodes from the final plan
- final SinkPlanNode sink = or.getNode("Output");
- final SingleInputPlanNode reducer = or.getNode("AggLio");
+ final SinkPlanNode sink = or.getNode(SINK);
+ final SingleInputPlanNode reducer = or.getNode(REDUCE_NAME);
final SingleInputPlanNode combiner = reducer.getPredecessor() instanceof SingleInputPlanNode ?
(SingleInputPlanNode) reducer.getPredecessor() : null;
- final DualInputPlanNode join = or.getNode("JoinLiO");
- final SingleInputPlanNode filteringMapper = or.getNode("FilterO");
+ final DualInputPlanNode join = or.getNode(JOIN_NAME);
+ final SingleInputPlanNode filteringMapper = or.getNode(MAPPER_NAME);
checkStandardStrategies(filteringMapper, join, combiner, reducer, sink);
@@ -230,7 +233,7 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
// ------------------------------------------------------------------------
// Checks for special conditions
// ------------------------------------------------------------------------
-
+
private void checkStandardStrategies(SingleInputPlanNode map, DualInputPlanNode join, SingleInputPlanNode combiner,
SingleInputPlanNode reducer, SinkPlanNode sink)
{
@@ -239,7 +242,7 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
Assert.assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
// check the driver strategies that are always fix
- Assert.assertEquals(DriverStrategy.COLLECTOR_MAP, map.getDriverStrategy());
+ Assert.assertEquals(DriverStrategy.FLAT_MAP, map.getDriverStrategy());
Assert.assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reducer.getDriverStrategy());
Assert.assertEquals(DriverStrategy.NONE, sink.getDriverStrategy());
if (combiner != null) {
@@ -348,4 +351,73 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
return false;
}
}
+
+ public static Plan getTPCH3Plan() {
+ // prepare the test environment
+ PreviewPlanEnvironment env = new PreviewPlanEnvironment();
+ env.setAsContext();
+ try {
+ TCPH3(new String[]{DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE});
+ } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
+ // all good.
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("TCPH3 failed with an exception");
+ }
+ return env.getPlan();
+ }
+
+ public static void TCPH3(String[] args) throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(Integer.parseInt(args[0]));
+
+ //order id, order status, order data, order prio, ship prio
+ DataSet<Tuple5<Long, String, String, String, Integer>> orders
+ = env.readCsvFile(args[1])
+ .fieldDelimiter("|").lineDelimiter("\n")
+ .includeFields("101011001").types(Long.class, String.class, String.class, String.class, Integer.class)
+ .name(ORDERS);
+
+ //order id, extended price
+ DataSet<Tuple2<Long, Double>> lineItems
+ = env.readCsvFile(args[2])
+ .fieldDelimiter("|").lineDelimiter("\n")
+ .includeFields("100001").types(Long.class, Double.class)
+ .name(LINEITEM);
+
+ DataSet<Tuple2<Long, Integer>> filterO = orders.flatMap(new FilterO()).name(MAPPER_NAME);
+
+ DataSet<Tuple3<Long, Integer, Double>> joinLiO = filterO.join(lineItems).where(0).equalTo(0).with(new JoinLiO()).name(JOIN_NAME);
+
+ DataSet<Tuple3<Long, Integer, Double>> aggLiO = joinLiO.groupBy(0, 1).reduceGroup(new AggLiO()).name(REDUCE_NAME);
+
+ aggLiO.writeAsCsv(args[3], "\n", "|").name(SINK);
+
+ env.execute();
+ }
+
+ @ForwardedFields("f0; f4->f1")
+ public static class FilterO implements FlatMapFunction<Tuple5<Long, String, String, String, Integer>, Tuple2<Long, Integer>> {
+ @Override
+ public void flatMap(Tuple5<Long, String, String, String, Integer> value, Collector<Tuple2<Long, Integer>> out) throws Exception {
+ // not going to be executed
+ }
+ }
+
+ @ForwardedFieldsFirst("f0; f1")
+ public static class JoinLiO implements FlatJoinFunction<Tuple2<Long, Integer>, Tuple2<Long, Double>, Tuple3<Long, Integer, Double>> {
+ @Override
+ public void join(Tuple2<Long, Integer> first, Tuple2<Long, Double> second, Collector<Tuple3<Long, Integer, Double>> out) throws Exception {
+ // not going to be executed
+ }
+ }
+
+ @ForwardedFields("f0; f1")
+ @Combinable
+ public static class AggLiO extends RichGroupReduceFunction<Tuple3<Long, Integer, Double>, Tuple3<Long, Integer, Double>> {
+ @Override
+ public void reduce(Iterable<Tuple3<Long, Integer, Double>> values, Collector<Tuple3<Long, Integer, Double>> out) throws Exception {
+ // not going to be executed
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java
index 99402a5..e134c7a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java
@@ -20,7 +20,19 @@ package org.apache.flink.test.optimizer.iterations;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.program.OptimizerPlanEnvironment.ProgramAbortException;
+import org.apache.flink.client.program.PreviewPlanEnvironment;
import org.apache.flink.optimizer.dag.TempMode;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
@@ -33,13 +45,14 @@ import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.util.LocalStrategy;
import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.test.recordJobs.graph.ConnectedComponentsWithCoGroup;
+import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;
/**
*
*/
+@SuppressWarnings("serial")
public class ConnectedComponentsCoGroupTest extends CompilerTestBase {
private static final String VERTEX_SOURCE = "Vertices";
@@ -59,10 +72,7 @@ public class ConnectedComponentsCoGroupTest extends CompilerTestBase {
@Test
public void testWorksetConnectedComponents() {
- ConnectedComponentsWithCoGroup cc = new ConnectedComponentsWithCoGroup();
-
- Plan plan = cc.getPlan(String.valueOf(DEFAULT_PARALLELISM),
- IN_FILE, IN_FILE, OUT_FILE, String.valueOf(100));
+ Plan plan = getConnectedComponentsCoGroupPlan();
plan.setExecutionConfig(new ExecutionConfig());
OptimizedPlan optPlan = compileNoStats(plan);
OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(optPlan);
@@ -134,4 +144,68 @@ public class ConnectedComponentsCoGroupTest extends CompilerTestBase {
JobGraphGenerator jgg = new JobGraphGenerator();
jgg.compileJobGraph(optPlan);
}
+
+ public static Plan getConnectedComponentsCoGroupPlan() {
+ // prepare the test environment
+ PreviewPlanEnvironment env = new PreviewPlanEnvironment();
+ env.setAsContext();
+ try {
+ ConnectedComponentsWithCoGroup(new String[]{DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE, "100"});
+ } catch (ProgramAbortException pae) {
+ // all good.
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("ConnectedComponentsWithCoGroup failed with an exception");
+ }
+ return env.getPlan();
+ }
+
+ public static void ConnectedComponentsWithCoGroup(String[] args) throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(Integer.parseInt(args[0]));
+
+ DataSet<Tuple1<Long>> initialVertices = env.readCsvFile(args[1]).types(Long.class).name(VERTEX_SOURCE);
+
+ DataSet<Tuple2<Long, Long>> edges = env.readCsvFile(args[2]).types(Long.class, Long.class).name(EDGES_SOURCE);
+
+ DataSet<Tuple2<Long, Long>> verticesWithId = initialVertices.flatMap(new DummyMapFunction());
+
+ DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration
+ = verticesWithId.iterateDelta(verticesWithId, Integer.parseInt(args[4]), 0).name(ITERATION_NAME);
+
+ DataSet<Tuple2<Long, Long>> joinWithNeighbors = iteration.getWorkset().join(edges)
+ .where(0).equalTo(0)
+ .with(new DummyJoinFunction()).name(JOIN_NEIGHBORS_MATCH);
+
+ DataSet<Tuple2<Long, Long>> minAndUpdate = joinWithNeighbors.coGroup(iteration.getSolutionSet())
+ .where(0).equalTo(0)
+ .with(new DummyCoGroupFunction()).name(MIN_ID_AND_UPDATE);
+
+ iteration.closeWith(minAndUpdate, minAndUpdate).writeAsCsv(args[3]).name(SINK);
+
+ env.execute();
+ }
+
+ public static class DummyMapFunction implements FlatMapFunction<Tuple1<Long>, Tuple2<Long, Long>> {
+ @Override
+ public void flatMap(Tuple1<Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+ // won't be executed
+ }
+ }
+
+ public static class DummyJoinFunction implements FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+ @Override
+ public void join(Tuple2<Long, Long> first, Tuple2<Long, Long> second, Collector<Tuple2<Long, Long>> out) throws Exception {
+ // won't be executed
+ }
+ }
+
+ @ForwardedFieldsFirst("f0->f0")
+ @ForwardedFieldsSecond("f0->f0")
+ public static class DummyCoGroupFunction implements CoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+ @Override
+ public void coGroup(Iterable<Tuple2<Long, Long>> first, Iterable<Tuple2<Long, Long>> second, Collector<Tuple2<Long, Long>> out) throws Exception {
+ // won't be executed
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/IterativeKMeansTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/IterativeKMeansTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/IterativeKMeansTest.java
deleted file mode 100644
index 3785270..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/IterativeKMeansTest.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.optimizer.iterations;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Arrays;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.optimizer.util.OperatorResolver;
-import org.apache.flink.test.recordJobs.kmeans.KMeansBroadcast;
-import org.junit.Assert;
-import org.junit.Test;
-
-@SuppressWarnings("deprecation")
-public class IterativeKMeansTest extends CompilerTestBase {
-
- private static final String DATAPOINTS = "Data Points";
- private static final String CENTERS = "Centers";
-
- private static final String MAPPER_NAME = "Find Nearest Centers";
- private static final String REDUCER_NAME = "Recompute Center Positions";
-
- private static final String ITERATION_NAME = "k-means loop";
-
- private static final String SINK = "New Center Positions";
-
- private final FieldList set0 = new FieldList(0);
-
- // --------------------------------------------------------------------------------------------
- // K-Means (Bulk Iteration)
- // --------------------------------------------------------------------------------------------
-
- @Test
- public void testCompileKMeansSingleStepWithStats() {
-
- KMeansBroadcast kmi = new KMeansBroadcast();
- Plan p = kmi.getPlan(String.valueOf(DEFAULT_PARALLELISM), IN_FILE, IN_FILE, OUT_FILE, String.valueOf(20));
- p.setExecutionConfig(new ExecutionConfig());
- // set the statistics
- OperatorResolver cr = getContractResolver(p);
- FileDataSource pointsSource = cr.getNode(DATAPOINTS);
- FileDataSource centersSource = cr.getNode(CENTERS);
- setSourceStatistics(pointsSource, 100l*1024*1024*1024, 32f);
- setSourceStatistics(centersSource, 1024*1024, 32f);
-
- OptimizedPlan plan = compileWithStats(p);
- checkPlan(plan);
-
- new JobGraphGenerator().compileJobGraph(plan);
- }
-
- @Test
- public void testCompileKMeansSingleStepWithOutStats() {
-
- KMeansBroadcast kmi = new KMeansBroadcast();
- Plan p = kmi.getPlan(String.valueOf(DEFAULT_PARALLELISM), IN_FILE, IN_FILE, OUT_FILE, String.valueOf(20));
- p.setExecutionConfig(new ExecutionConfig());
- OptimizedPlan plan = compileNoStats(p);
- checkPlan(plan);
-
- new JobGraphGenerator().compileJobGraph(plan);
- }
-
- private void checkPlan(OptimizedPlan plan) {
-
- OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(plan);
-
- final SinkPlanNode sink = or.getNode(SINK);
- final SingleInputPlanNode reducer = or.getNode(REDUCER_NAME);
- final SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getPredecessor();
- final SingleInputPlanNode mapper = or.getNode(MAPPER_NAME);
-
- final BulkIterationPlanNode iter = or.getNode(ITERATION_NAME);
-
- // -------------------- outside the loop -----------------------
-
- // check the sink
- assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
- assertEquals(LocalStrategy.NONE, sink.getInput().getLocalStrategy());
-
- // check the iteration
- assertEquals(ShipStrategyType.FORWARD, iter.getInput().getShipStrategy());
- assertEquals(LocalStrategy.NONE, iter.getInput().getLocalStrategy());
-
-
- // -------------------- inside the loop -----------------------
-
- // check the mapper
- assertEquals(1, mapper.getBroadcastInputs().size());
- assertEquals(ShipStrategyType.FORWARD, mapper.getInput().getShipStrategy());
- assertEquals(ShipStrategyType.BROADCAST, mapper.getBroadcastInputs().get(0).getShipStrategy());
- assertFalse(mapper.getInput().isOnDynamicPath());
- assertTrue(mapper.getBroadcastInputs().get(0).isOnDynamicPath());
- assertTrue(mapper.getInput().getTempMode().isCached());
-
- assertEquals(LocalStrategy.NONE, mapper.getInput().getLocalStrategy());
- assertEquals(LocalStrategy.NONE, mapper.getBroadcastInputs().get(0).getLocalStrategy());
-
- assertEquals(DriverStrategy.COLLECTOR_MAP, mapper.getDriverStrategy());
-
- assertNull(mapper.getInput().getLocalStrategyKeys());
- assertNull(mapper.getInput().getLocalStrategySortOrder());
- assertNull(mapper.getBroadcastInputs().get(0).getLocalStrategyKeys());
- assertNull(mapper.getBroadcastInputs().get(0).getLocalStrategySortOrder());
-
- // check the combiner
- Assert.assertNotNull(combiner);
- assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
- assertTrue(combiner.getInput().isOnDynamicPath());
-
- assertEquals(LocalStrategy.NONE, combiner.getInput().getLocalStrategy());
- assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combiner.getDriverStrategy());
- assertNull(combiner.getInput().getLocalStrategyKeys());
- assertNull(combiner.getInput().getLocalStrategySortOrder());
- assertEquals(set0, combiner.getKeys(0));
- assertEquals(set0, combiner.getKeys(1));
-
- // check the reducer
- assertEquals(ShipStrategyType.PARTITION_HASH, reducer.getInput().getShipStrategy());
- assertTrue(reducer.getInput().isOnDynamicPath());
- assertEquals(LocalStrategy.COMBININGSORT, reducer.getInput().getLocalStrategy());
- assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reducer.getDriverStrategy());
- assertEquals(set0, reducer.getKeys(0));
- assertEquals(set0, reducer.getInput().getLocalStrategyKeys());
- assertTrue(Arrays.equals(reducer.getInput().getLocalStrategySortOrder(), reducer.getSortOrders(0)));
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
index 24d9416..d186cbb 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
@@ -25,13 +25,12 @@ import org.apache.flink.client.program.PreviewPlanEnvironment;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.examples.java.clustering.KMeans;
+import org.apache.flink.examples.java.graph.ConnectedComponents;
+import org.apache.flink.examples.java.graph.PageRankBasic;
+import org.apache.flink.examples.java.relational.TPCHQuery3;
+import org.apache.flink.examples.java.relational.WebLogAnalysis;
+import org.apache.flink.examples.java.wordcount.WordCount;
import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.test.recordJobs.graph.DeltaPageRankWithInitialDeltas;
-import org.apache.flink.test.recordJobs.kmeans.KMeansBroadcast;
-import org.apache.flink.test.recordJobs.kmeans.KMeansSingleStep;
-import org.apache.flink.test.recordJobs.relational.TPCHQuery3;
-import org.apache.flink.test.recordJobs.relational.WebLogAnalysis;
-import org.apache.flink.test.recordJobs.wordcount.WordCount;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.JsonParser;
@@ -45,17 +44,34 @@ public class DumpCompiledPlanTest extends CompilerTestBase {
@Test
public void dumpWordCount() {
- dump(new WordCount().getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, OUT_FILE));
+ // prepare the test environment
+ PreviewPlanEnvironment env = new PreviewPlanEnvironment();
+ env.setAsContext();
+ try {
+ WordCount.main(new String[] {IN_FILE, OUT_FILE});
+ } catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
+ // all good.
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("WordCount failed with an exception");
+ }
+ dump(env.getPlan());
}
@Test
public void dumpTPCH3() {
- dump(new TPCHQuery3().getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE));
- }
-
- @Test
- public void dumpKMeans() {
- dump(new KMeansSingleStep().getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE));
+ // prepare the test environment
+ PreviewPlanEnvironment env = new PreviewPlanEnvironment();
+ env.setAsContext();
+ try {
+ TPCHQuery3.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "123"});
+ } catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
+ // all good.
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("TPCH3 failed with an exception");
+ }
+ dump(env.getPlan());
}
@Test
@@ -64,7 +80,6 @@ public class DumpCompiledPlanTest extends CompilerTestBase {
PreviewPlanEnvironment env = new PreviewPlanEnvironment();
env.setAsContext();
try {
- // <points path> <centers path> <result path> <num iterations
KMeans.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "123"});
} catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
// all good.
@@ -77,17 +92,50 @@ public class DumpCompiledPlanTest extends CompilerTestBase {
@Test
public void dumpWebLogAnalysis() {
- dump(new WebLogAnalysis().getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, IN_FILE, OUT_FILE));
+ // prepare the test environment
+ PreviewPlanEnvironment env = new PreviewPlanEnvironment();
+ env.setAsContext();
+ try {
+ WebLogAnalysis.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "123"});
+ } catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
+ // all good.
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("WebLogAnalysis failed with an exception");
+ }
+ dump(env.getPlan());
}
@Test
public void dumpBulkIterationKMeans() {
- dump(new KMeansBroadcast().getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, OUT_FILE));
+ // prepare the test environment
+ PreviewPlanEnvironment env = new PreviewPlanEnvironment();
+ env.setAsContext();
+ try {
+ ConnectedComponents.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "123"});
+ } catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
+ // all good.
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("ConnectedComponents failed with an exception");
+ }
+ dump(env.getPlan());
}
@Test
- public void dumpDeltaPageRank() {
- dump(new DeltaPageRankWithInitialDeltas().getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, IN_FILE, OUT_FILE, "10"));
+ public void dumpPageRank() {
+ // prepare the test environment
+ PreviewPlanEnvironment env = new PreviewPlanEnvironment();
+ env.setAsContext();
+ try {
+ PageRankBasic.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "10", "123"});
+ } catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
+ // all good.
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("PagaRank failed with an exception");
+ }
+ dump(env.getPlan());
}
private void dump(Plan p) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java
index 49fe6d8..95a06c3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java
@@ -21,16 +21,17 @@ package org.apache.flink.test.optimizer.jsonplan;
import java.util.List;
import org.apache.flink.api.common.Plan;
+import org.apache.flink.client.program.OptimizerPlanEnvironment;
+import org.apache.flink.client.program.PreviewPlanEnvironment;
+import org.apache.flink.examples.java.clustering.KMeans;
+import org.apache.flink.examples.java.graph.ConnectedComponents;
+import org.apache.flink.examples.java.graph.PageRankBasic;
+import org.apache.flink.examples.java.relational.TPCHQuery3;
+import org.apache.flink.examples.java.wordcount.WordCount;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.dag.DataSinkNode;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.test.recordJobs.graph.DeltaPageRankWithInitialDeltas;
-import org.apache.flink.test.recordJobs.kmeans.KMeansBroadcast;
-import org.apache.flink.test.recordJobs.kmeans.KMeansSingleStep;
-import org.apache.flink.test.recordJobs.relational.TPCHQuery3;
-import org.apache.flink.test.recordJobs.relational.WebLogAnalysis;
-import org.apache.flink.test.recordJobs.wordcount.WordCount;
-import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.optimizer.util.CompilerTestBase;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.JsonParser;
@@ -40,51 +41,102 @@ import org.junit.Test;
/*
* The tests in this class simply invokes the JSON dump code for the original plan.
*/
-public class PreviewPlanDumpTest {
+public class PreviewPlanDumpTest extends CompilerTestBase {
- protected static final String IN_FILE = OperatingSystem.isWindows() ? "file:/c:/test/file" : "file:///test/file";
-
- protected static final String OUT_FILE = OperatingSystem.isWindows() ? "file:/c:/test/output" : "file:///test/output";
-
- protected static final String[] NO_ARGS = new String[0];
-
@Test
public void dumpWordCount() {
- dump(new WordCount().getPlan("4", IN_FILE, OUT_FILE));
-
- // The web interface passes empty string-args to compute the preview of the
- // job, so we should test this situation too
- dump(new WordCount().getPlan(NO_ARGS));
+ // prepare the test environment
+ PreviewPlanEnvironment env = new PreviewPlanEnvironment();
+ env.setAsContext();
+ try {
+ WordCount.main(new String[] {IN_FILE, OUT_FILE});
+ } catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
+ // all good.
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("WordCount failed with an exception");
+ }
+ dump(env.getPlan());
}
@Test
public void dumpTPCH3() {
- dump(new TPCHQuery3().getPlan("4", IN_FILE, IN_FILE, OUT_FILE));
- dump(new TPCHQuery3().getPlan(NO_ARGS));
+ // prepare the test environment
+ PreviewPlanEnvironment env = new PreviewPlanEnvironment();
+ env.setAsContext();
+ try {
+ TPCHQuery3.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "123"});
+ } catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
+ // all good.
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("TPCH3 failed with an exception");
+ }
+ dump(env.getPlan());
}
@Test
- public void dumpKMeans() {
- dump(new KMeansSingleStep().getPlan("4", IN_FILE, IN_FILE, OUT_FILE));
- dump(new KMeansSingleStep().getPlan(NO_ARGS));
+ public void dumpIterativeKMeans() {
+ // prepare the test environment
+ PreviewPlanEnvironment env = new PreviewPlanEnvironment();
+ env.setAsContext();
+ try {
+ KMeans.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "123"});
+ } catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
+ // all good.
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("KMeans failed with an exception");
+ }
+ dump(env.getPlan());
}
@Test
public void dumpWebLogAnalysis() {
- dump(new WebLogAnalysis().getPlan("4", IN_FILE, IN_FILE, IN_FILE, OUT_FILE));
- dump(new WebLogAnalysis().getPlan(NO_ARGS));
+ // prepare the test environment
+ PreviewPlanEnvironment env = new PreviewPlanEnvironment();
+ env.setAsContext();
+ try {
+ org.apache.flink.examples.java.relational.WebLogAnalysis.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "123"});
+ } catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
+ // all good.
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("WebLogAnalysis failed with an exception");
+ }
+ dump(env.getPlan());
}
-
+
@Test
public void dumpBulkIterationKMeans() {
- dump(new KMeansBroadcast().getPlan("4", IN_FILE, OUT_FILE));
- dump(new KMeansBroadcast().getPlan(NO_ARGS));
+ // prepare the test environment
+ PreviewPlanEnvironment env = new PreviewPlanEnvironment();
+ env.setAsContext();
+ try {
+ ConnectedComponents.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "123"});
+ } catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
+ // all good.
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("ConnectedComponents failed with an exception");
+ }
+ dump(env.getPlan());
}
@Test
- public void dumpDeltaPageRank() {
- dump(new DeltaPageRankWithInitialDeltas().getPlan("4", IN_FILE, IN_FILE, IN_FILE, OUT_FILE, "10"));
- dump(new DeltaPageRankWithInitialDeltas().getPlan(NO_ARGS));
+ public void dumpPageRank() {
+ // prepare the test environment
+ PreviewPlanEnvironment env = new PreviewPlanEnvironment();
+ env.setAsContext();
+ try {
+ PageRankBasic.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "10", "123"});
+ } catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
+ // all good.
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("PagaRank failed with an exception");
+ }
+ dump(env.getPlan());
}
private void dump(Plan p) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
index 310ded8..ec2dbb7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
@@ -29,8 +29,6 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.junit.After;
import org.junit.Ignore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,6 +36,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.test.util.JavaProgramTestBase;
@Ignore
public class NetworkStackThroughputITCase {
@@ -62,8 +62,8 @@ public class NetworkStackThroughputITCase {
// ------------------------------------------------------------------------
- // wrapper to reuse RecordAPITestBase code in runs via main()
- private static class TestBaseWrapper extends RecordAPITestBase {
+ // wrapper to reuse JavaProgramTestBase code in runs via main()
+ private static class TestBaseWrapper extends JavaProgramTestBase {
private int dataVolumeGb;
private boolean useForwarder;
@@ -90,7 +90,6 @@ public class NetworkStackThroughputITCase {
setTaskManagerNumSlots(numSlots);
}
- @Override
protected JobGraph getJobGraph() throws Exception {
return createJobGraph(dataVolumeGb, useForwarder, isSlowSender, isSlowReceiver, parallelism);
}
@@ -138,19 +137,19 @@ public class NetworkStackThroughputITCase {
return jobGraph;
}
- @After
- public void calculateThroughput() {
- if (getJobExecutionResult() != null) {
- int dataVolumeGb = this.config.getInteger(DATA_VOLUME_GB_CONFIG_KEY, 1);
- long dataVolumeMbit = dataVolumeGb * 8192;
- long runtimeSecs = getJobExecutionResult().getNetRuntime(TimeUnit.SECONDS);
+ @Override
+ protected void testProgram() throws Exception {
+ JobExecutionResult jer = executor.submitJobAndWait(getJobGraph(), false);
+ int dataVolumeGb = this.config.getInteger(DATA_VOLUME_GB_CONFIG_KEY, 1);
+
+ long dataVolumeMbit = dataVolumeGb * 8192;
+ long runtimeSecs = jer.getNetRuntime(TimeUnit.SECONDS);
- int mbitPerSecond = (int) (((double) dataVolumeMbit) / runtimeSecs);
+ int mbitPerSecond = (int) (((double) dataVolumeMbit) / runtimeSecs);
- LOG.info(String.format("Test finished with throughput of %d MBit/s (runtime [secs]: %d, " +
- "data volume [gb/mbits]: %d/%d)", mbitPerSecond, runtimeSecs, dataVolumeGb, dataVolumeMbit));
- }
+ LOG.info(String.format("Test finished with throughput of %d MBit/s (runtime [secs]: %d, " +
+ "data volume [gb/mbits]: %d/%d)", mbitPerSecond, runtimeSecs, dataVolumeGb, dataVolumeMbit));
}
}
@@ -289,8 +288,7 @@ public class NetworkStackThroughputITCase {
TestBaseWrapper test = new TestBaseWrapper(config);
System.out.println(Arrays.toString(p));
- test.testJob();
- test.calculateThroughput();
+ test.testProgram();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/testPrograms/util/tests/IntTupleDataInFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/testPrograms/util/tests/IntTupleDataInFormatTest.java b/flink-tests/src/test/java/org/apache/flink/test/testPrograms/util/tests/IntTupleDataInFormatTest.java
deleted file mode 100644
index 0cff9b6..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/testPrograms/util/tests/IntTupleDataInFormatTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.testPrograms.util.tests;
-
-import org.apache.flink.test.recordJobs.util.IntTupleDataInFormat;
-import org.apache.flink.test.recordJobs.util.Tuple;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class IntTupleDataInFormatTest
-{
- @Test
- public void testReadLineKeyValuePairOfIntValueTupleByteArray() {
-
- String[] testTuples = {
- "1|attribute1|attribute2|3|attribute4|5|",
- "2|3|",
- "3|attribute1|attribute2|",
- "-1|attr1|attr2|",
- "-2|attribute1|attribute2|",
- Integer.MAX_VALUE+"|attr1|attr2|attr3|attr4|",
- Integer.MIN_VALUE+"|attr1|attr2|attr3|attr4|"
- };
-
- int[] expectedKeys = {
- 1,2,3,-1,-2,Integer.MAX_VALUE,Integer.MIN_VALUE
- };
-
- int[] expectedAttrCnt = {6,2,3,3,3,5,5};
-
- IntTupleDataInFormat inFormat = new IntTupleDataInFormat();
- Record rec = new Record();
-
- for(int i = 0; i < testTuples.length; i++) {
-
- byte[] tupleBytes = testTuples[i].getBytes();
-
- inFormat.readRecord(rec, tupleBytes, 0, tupleBytes.length);
-
- Assert.assertTrue("Expected Key: "+expectedKeys[i]+" != Returned Key: "+rec.getField(0, IntValue.class), rec.getField(0, IntValue.class).equals(new IntValue(expectedKeys[i])));
- Assert.assertTrue("Expected Attr Cnt: "+expectedAttrCnt[i]+" != Returned Attr Cnt: "+rec.getField(1, Tuple.class), rec.getField(1, Tuple.class).getNumberOfColumns() == expectedAttrCnt[i]);
- }
- }
-}