You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/06/20 15:33:15 UTC
[1/2] git commit: [FLINK-957] Throw meaningful warnings and
exceptions upon incorrect use of delta iterations.
Repository: incubator-flink
Updated Branches:
refs/heads/master f8ec28c73 -> cd665b9e8
[FLINK-957] Throw meaningful warnings and exceptions upon incorrect use of delta iterations.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/cd665b9e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/cd665b9e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/cd665b9e
Branch: refs/heads/master
Commit: cd665b9e8abec2bbfecf384fe7273bd50f22ce67
Parents: 9d57045
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Jun 19 19:54:23 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Jun 20 15:32:20 2014 +0200
----------------------------------------------------------------------
.../eu/stratosphere/compiler/PactCompiler.java | 2 +-
.../stratosphere/api/java/DeltaIteration.java | 2 +-
.../api/java/operators/CoGroupOperator.java | 20 ++++++
.../stratosphere/api/java/operators/Keys.java | 27 ++++++---
.../DeltaIterationTranslationTest.java | 64 +++++++++++++++++++-
5 files changed, 105 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cd665b9e/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java
index 96eb01d..2076902 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java
@@ -1055,7 +1055,7 @@ public class PactCompiler {
}
}
else {
- throw new CompilerException("Error: The solution set may only be joined with through a Join or a CoGroup function.");
+ throw new CompilerException("Error: The only operations allowed on the solution set are Join and CoGroup.");
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cd665b9e/stratosphere-java/src/main/java/eu/stratosphere/api/java/DeltaIteration.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/DeltaIteration.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/DeltaIteration.java
index 7fa6638..3522947 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/DeltaIteration.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/DeltaIteration.java
@@ -223,7 +223,7 @@ public class DeltaIteration<ST, WT> {
public void checkJoinKeyFields(int[] keyFields) {
int[] ssKeys = deltaIteration.keys.computeLogicalKeyPositions();
if (!Arrays.equals(ssKeys, keyFields)) {
- throw new InvalidProgramException("The solution set must be joind with using the keys with which elements are identified.");
+ throw new InvalidProgramException("The solution can only be joined/co-grouped with the same keys as the elements are identified with (here: " + Arrays.toString(ssKeys) + ").");
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cd665b9e/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/CoGroupOperator.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/CoGroupOperator.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/CoGroupOperator.java
index ca4b1db..f2484ce 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/CoGroupOperator.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/CoGroupOperator.java
@@ -25,8 +25,10 @@ import eu.stratosphere.api.common.operators.UnaryOperatorInformation;
import eu.stratosphere.api.common.operators.base.CoGroupOperatorBase;
import eu.stratosphere.api.common.operators.base.MapOperatorBase;
import eu.stratosphere.api.java.DataSet;
+import eu.stratosphere.api.java.DeltaIteration.SolutionSetPlaceHolder;
import eu.stratosphere.api.java.functions.CoGroupFunction;
import eu.stratosphere.api.java.functions.KeySelector;
+import eu.stratosphere.api.java.operators.Keys.FieldPositionKeys;
import eu.stratosphere.api.java.operators.translation.KeyExtractingMapper;
import eu.stratosphere.api.java.operators.translation.PlanUnwrappingCoGroupOperator;
import eu.stratosphere.api.java.operators.translation.TupleKeyExtractingMapper;
@@ -409,6 +411,24 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
if (!keys1.areCompatibale(keys2)) {
throw new InvalidProgramException("The pair of join keys are not compatible with each other.");
}
+
+ // sanity check solution set key mismatches
+ if (input1 instanceof SolutionSetPlaceHolder) {
+ if (keys1 instanceof FieldPositionKeys) {
+ int[] positions = ((FieldPositionKeys<?>) keys1).computeLogicalKeyPositions();
+ ((SolutionSetPlaceHolder<?>) input1).checkJoinKeyFields(positions);
+ } else {
+ throw new InvalidProgramException("Currently, the solution set may only be CoGrouped with using tuple field positions.");
+ }
+ }
+ if (input2 instanceof SolutionSetPlaceHolder) {
+ if (keys2 instanceof FieldPositionKeys) {
+ int[] positions = ((FieldPositionKeys<?>) keys2).computeLogicalKeyPositions();
+ ((SolutionSetPlaceHolder<?>) input2).checkJoinKeyFields(positions);
+ } else {
+ throw new InvalidProgramException("Currently, the solution set may only be CoGrouped with using tuple field positions.");
+ }
+ }
return new CoGroupOperatorWithoutFunction(keys2);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cd665b9e/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/Keys.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/Keys.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/Keys.java
index 9c89eb1..6026903 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/Keys.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/Keys.java
@@ -42,7 +42,7 @@ public abstract class Keys<T> {
public static class FieldPositionKeys<T> extends Keys<T> {
- private final int[] groupingFields;
+ private final int[] fieldPositions;
private final TypeInformation<?>[] types;
public FieldPositionKeys(int[] groupingFields, TypeInformation<T> type) {
@@ -60,18 +60,18 @@ public abstract class Keys<T> {
TupleTypeInfo<?> tupleType = (TupleTypeInfo<?>)type;
- this.groupingFields = makeFields(groupingFields, (TupleTypeInfo<?>) type);
+ this.fieldPositions = makeFields(groupingFields, (TupleTypeInfo<?>) type);
- types = new TypeInformation[this.groupingFields.length];
- for(int i = 0; i < this.groupingFields.length; i++) {
- types[i] = tupleType.getTypeAt(this.groupingFields[i]);
+ types = new TypeInformation[this.fieldPositions.length];
+ for(int i = 0; i < this.fieldPositions.length; i++) {
+ types[i] = tupleType.getTypeAt(this.fieldPositions[i]);
}
}
@Override
public int getNumberOfKeyFields() {
- return this.groupingFields.length;
+ return this.fieldPositions.length;
}
@Override
@@ -106,9 +106,13 @@ public abstract class Keys<T> {
@Override
public int[] computeLogicalKeyPositions() {
- return this.groupingFields;
+ return this.fieldPositions;
}
+ @Override
+ public String toString() {
+ return Arrays.toString(fieldPositions);
+ }
}
// --------------------------------------------------------------------------------------------
@@ -119,6 +123,10 @@ public abstract class Keys<T> {
private final TypeInformation<K> keyType;
public SelectorFunctionKeys(KeySelector<T, K> keyExtractor, TypeInformation<T> type) {
+ if (keyExtractor == null) {
+ throw new NullPointerException("Key extractor must not be null.");
+ }
+
this.keyExtractor = keyExtractor;
this.keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, type);
}
@@ -163,6 +171,11 @@ public abstract class Keys<T> {
public int[] computeLogicalKeyPositions() {
return new int[] {0};
}
+
+ @Override
+ public String toString() {
+ return keyExtractor + " (" + keyType + ")";
+ }
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cd665b9e/stratosphere-java/src/test/java/eu/stratosphere/api/java/operators/translation/DeltaIterationTranslationTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/test/java/eu/stratosphere/api/java/operators/translation/DeltaIterationTranslationTest.java b/stratosphere-java/src/test/java/eu/stratosphere/api/java/operators/translation/DeltaIterationTranslationTest.java
index ad170a2..37b01ef 100644
--- a/stratosphere-java/src/test/java/eu/stratosphere/api/java/operators/translation/DeltaIterationTranslationTest.java
+++ b/stratosphere-java/src/test/java/eu/stratosphere/api/java/operators/translation/DeltaIterationTranslationTest.java
@@ -33,10 +33,12 @@ import eu.stratosphere.api.common.operators.base.MapOperatorBase;
import eu.stratosphere.api.java.DataSet;
import eu.stratosphere.api.java.DeltaIteration;
import eu.stratosphere.api.java.ExecutionEnvironment;
+import eu.stratosphere.api.java.functions.CoGroupFunction;
import eu.stratosphere.api.java.functions.JoinFunction;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.api.java.tuple.Tuple3;
+import eu.stratosphere.util.Collector;
@SuppressWarnings("serial")
public class DeltaIterationTranslationTest implements java.io.Serializable {
@@ -137,7 +139,7 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
}
@Test
- public void testRejectWhenSolutionSetKeysDontMatch() {
+ public void testRejectWhenSolutionSetKeysDontMatchJoin() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -156,6 +158,50 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
catch (InvalidProgramException e) {
// all good!
}
+
+ try {
+ iteration.getSolutionSet().join(iteration.getWorkset()).where(2).equalTo(1);
+ fail("Accepted invalid program.");
+ }
+ catch (InvalidProgramException e) {
+ // all good!
+ }
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testRejectWhenSolutionSetKeysDontMatchCoGroup() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ @SuppressWarnings("unchecked")
+ DataSet<Tuple3<Double, Long, String>> initialSolutionSet = env.fromElements(new Tuple3<Double, Long, String>(3.44, 5L, "abc"));
+
+ @SuppressWarnings("unchecked")
+ DataSet<Tuple2<Double, String>> initialWorkSet = env.fromElements(new Tuple2<Double, String>(1.23, "abc"));
+
+ DeltaIteration<Tuple3<Double, Long, String>, Tuple2<Double, String>> iteration = initialSolutionSet.iterateDelta(initialWorkSet, 10, 1);
+
+ try {
+ iteration.getWorkset().coGroup(iteration.getSolutionSet()).where(1).equalTo(2).with(new SolutionWorksetCoGroup1());
+ fail("Accepted invalid program.");
+ }
+ catch (InvalidProgramException e) {
+ // all good!
+ }
+
+ try {
+ iteration.getSolutionSet().coGroup(iteration.getWorkset()).where(2).equalTo(1).with(new SolutionWorksetCoGroup2());
+ fail("Accepted invalid program.");
+ }
+ catch (InvalidProgramException e) {
+ // all good!
+ }
}
catch (Exception e) {
System.err.println(e.getMessage());
@@ -187,4 +233,20 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
return value;
}
}
+
+ public static class SolutionWorksetCoGroup1 extends CoGroupFunction<Tuple2<Double, String>, Tuple3<Double, Long, String>, Tuple3<Double, Long, String>> {
+
+ @Override
+ public void coGroup(Iterator<Tuple2<Double, String>> first, Iterator<Tuple3<Double, Long, String>> second,
+ Collector<Tuple3<Double, Long, String>> out) {
+ }
+ }
+
+ public static class SolutionWorksetCoGroup2 extends CoGroupFunction<Tuple3<Double, Long, String>, Tuple2<Double, String>, Tuple3<Double, Long, String>> {
+
+ @Override
+ public void coGroup(Iterator<Tuple3<Double, Long, String>> second, Iterator<Tuple2<Double, String>> first,
+ Collector<Tuple3<Double, Long, String>> out) {
+ }
+ }
}
[2/2] git commit: Syntactic sugar for sum, min, and max aggregations
Posted by se...@apache.org.
Syntactic sugar for sum, min, and max aggregations
This closes #7
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/9d570450
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/9d570450
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/9d570450
Branch: refs/heads/master
Commit: 9d57045020240e16ed115fab29f45b6d3f78a492
Parents: f8ec28c
Author: Kostas Tzoumas <Ko...@gmail.com>
Authored: Tue Jun 10 16:58:08 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Jun 20 15:32:20 2014 +0200
----------------------------------------------------------------------
.../example/java/wordcount/WordCount.java | 3 +-
.../java/eu/stratosphere/api/java/DataSet.java | 33 +++++
.../api/java/operators/AggregateOperator.java | 16 +-
.../api/java/operators/UnsortedGrouping.java | 33 +++++
.../test/javaApiOperators/SumMinMaxITCase.java | 147 +++++++++++++++++++
5 files changed, 229 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9d570450/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCount.java b/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCount.java
index 62ec2ca..95ab92f 100644
--- a/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCount.java
+++ b/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCount.java
@@ -16,7 +16,6 @@ package eu.stratosphere.example.java.wordcount;
import eu.stratosphere.api.java.DataSet;
import eu.stratosphere.api.java.ExecutionEnvironment;
-import eu.stratosphere.api.java.aggregation.Aggregations;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.example.java.wordcount.util.WordCountData;
@@ -66,7 +65,7 @@ public class WordCount {
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0)
- .aggregate(Aggregations.SUM, 1);
+ .sum(1);
// emit result
if(fileOutput) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9d570450/stratosphere-java/src/main/java/eu/stratosphere/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/DataSet.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/DataSet.java
index 758cbf2..0770f24 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/DataSet.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/DataSet.java
@@ -223,6 +223,39 @@ public abstract class DataSet<T> {
public AggregateOperator<T> aggregate(Aggregations agg, int field) {
return new AggregateOperator<T>(this, agg, field);
}
+
+ /**
+ * Syntactic sugar for aggregate (SUM, field)
+ * @param field The index of the Tuple field on which the aggregation function is applied.
+ * @return An AggregateOperator that represents the summed DataSet.
+ *
+ * @see eu.stratosphere.api.java.operators.AggregateOperator
+ */
+ public AggregateOperator<T> sum (int field) {
+ return this.aggregate (Aggregations.SUM, field);
+ }
+
+ /**
+ * Syntactic sugar for aggregate (MAX, field)
+ * @param field The index of the Tuple field on which the aggregation function is applied.
+ * @return An AggregateOperator that represents the max'ed DataSet.
+ *
+ * @see eu.stratosphere.api.java.operators.AggregateOperator
+ */
+ public AggregateOperator<T> max (int field) {
+ return this.aggregate (Aggregations.MAX, field);
+ }
+
+ /**
+ * Syntactic sugar for aggregate (MIN, field)
+ * @param field The index of the Tuple field on which the aggregation function is applied.
+ * @return An AggregateOperator that represents the min'ed DataSet.
+ *
+ * @see eu.stratosphere.api.java.operators.AggregateOperator
+ */
+ public AggregateOperator<T> min (int field) {
+ return this.aggregate (Aggregations.MIN, field);
+ }
/**
* Applies a Reduce transformation on a non-grouped {@link DataSet}.<br/>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9d570450/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/AggregateOperator.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/AggregateOperator.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/AggregateOperator.java
index acc0453..e68cb67 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/AggregateOperator.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/AggregateOperator.java
@@ -130,7 +130,21 @@ public class AggregateOperator<IN> extends SingleInputOperator<IN, IN, Aggregate
return this;
}
-
+
+
+ public AggregateOperator<IN> andSum (int field) {
+ return this.and(Aggregations.SUM, field);
+ }
+
+ public AggregateOperator<IN> andMin (int field) {
+ return this.and(Aggregations.MIN, field);
+ }
+
+ public AggregateOperator<IN> andMax (int field) {
+ return this.and(Aggregations.MAX, field);
+ }
+
+
@SuppressWarnings("unchecked")
@Override
protected eu.stratosphere.api.common.operators.base.GroupReduceOperatorBase<IN, IN, GenericGroupReduce<IN, IN>> translateToDataFlow(Operator<IN> input) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9d570450/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/UnsortedGrouping.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/UnsortedGrouping.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/UnsortedGrouping.java
index 95e40bc..15df334 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/UnsortedGrouping.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/UnsortedGrouping.java
@@ -49,6 +49,39 @@ public class UnsortedGrouping<T> extends Grouping<T> {
public AggregateOperator<T> aggregate(Aggregations agg, int field) {
return new AggregateOperator<T>(this, agg, field);
}
+
+ /**
+ * Syntactic sugar for aggregate (SUM, field)
+ * @param field The index of the Tuple field on which the aggregation function is applied.
+ * @return An AggregateOperator that represents the summed DataSet.
+ *
+ * @see eu.stratosphere.api.java.operators.AggregateOperator
+ */
+ public AggregateOperator<T> sum (int field) {
+ return this.aggregate (Aggregations.SUM, field);
+ }
+
+ /**
+ * Syntactic sugar for aggregate (MAX, field)
+ * @param field The index of the Tuple field on which the aggregation function is applied.
+ * @return An AggregateOperator that represents the max'ed DataSet.
+ *
+ * @see eu.stratosphere.api.java.operators.AggregateOperator
+ */
+ public AggregateOperator<T> max (int field) {
+ return this.aggregate (Aggregations.MAX, field);
+ }
+
+ /**
+ * Syntactic sugar for aggregate (MIN, field)
+ * @param field The index of the Tuple field on which the aggregation function is applied.
+ * @return An AggregateOperator that represents the min'ed DataSet.
+ *
+ * @see eu.stratosphere.api.java.operators.AggregateOperator
+ */
+ public AggregateOperator<T> min (int field) {
+ return this.aggregate (Aggregations.MIN, field);
+ }
/**
* Applies a Reduce transformation on a grouped {@link DataSet}.<br/>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9d570450/stratosphere-tests/src/test/java/eu/stratosphere/test/javaApiOperators/SumMinMaxITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/javaApiOperators/SumMinMaxITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/javaApiOperators/SumMinMaxITCase.java
new file mode 100644
index 0000000..8b7dc80
--- /dev/null
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/javaApiOperators/SumMinMaxITCase.java
@@ -0,0 +1,147 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.test.javaApiOperators;
+
+
+import eu.stratosphere.api.java.DataSet;
+import eu.stratosphere.api.java.ExecutionEnvironment;
+import eu.stratosphere.api.java.aggregation.Aggregations;
+import eu.stratosphere.api.java.tuple.Tuple1;
+import eu.stratosphere.api.java.tuple.Tuple2;
+import eu.stratosphere.api.java.tuple.Tuple3;
+import eu.stratosphere.configuration.Configuration;
+import eu.stratosphere.test.javaApiOperators.util.CollectionDataSets;
+import eu.stratosphere.test.util.JavaProgramTestBase;
+import org.junit.runners.Parameterized;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+
+public class SumMinMaxITCase extends JavaProgramTestBase {
+
+ private static int NUM_PROGRAMS = 3;
+
+ private int curProgId = config.getInteger("ProgramId", -1);
+ private String resultPath;
+ private String expectedResult;
+
+ public SumMinMaxITCase(Configuration config) {
+ super(config);
+ }
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ expectedResult = SumMinMaxProgs.runProgram(curProgId, resultPath);
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(expectedResult, resultPath);
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
+
+ LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+
+ for(int i=1; i <= NUM_PROGRAMS; i++) {
+ Configuration config = new Configuration();
+ config.setInteger("ProgramId", i);
+ tConfigs.add(config);
+ }
+
+ return toParameterList(tConfigs);
+ }
+
+ /**
+ * These tests are copied from
+ * @see eu.stratosphere.test.javaApiOperators.AggregateITCase
+ * replacing calls to aggregate with calls to sum, min, and max
+ */
+ private static class SumMinMaxProgs {
+
+ public static String runProgram(int progId, String resultPath) throws Exception {
+ switch(progId) {
+ case 1: {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple2<Integer, Long>> sumDs = ds
+ .sum(0)
+ .andMax(1)
+ .project(0, 1).types(Integer.class, Long.class);
+
+ sumDs.writeAsCsv(resultPath);
+ env.execute();
+
+ // return expected result
+ return "231,6\n";
+ }
+ case 2: {
+ /*
+ * Grouped Aggregate
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple2<Long, Integer>> aggregateDs = ds.groupBy(1)
+ .sum(0)
+ .project(1, 0).types(Long.class, Integer.class);
+
+ aggregateDs.writeAsCsv(resultPath);
+ env.execute();
+
+ // return expected result
+ return "1,1\n" +
+ "2,5\n" +
+ "3,15\n" +
+ "4,34\n" +
+ "5,65\n" +
+ "6,111\n";
+ }
+ case 3: {
+ /*
+ * Nested Aggregate
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple1<Integer>> aggregateDs = ds.groupBy(1)
+ .min(0)
+ .min(0)
+ .project(0).types(Integer.class);
+
+ aggregateDs.writeAsCsv(resultPath);
+ env.execute();
+
+ // return expected result
+ return "1\n";
+ }
+ default:
+ throw new IllegalArgumentException("Invalid program id");
+ }
+ }
+ }
+}