You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/06/20 15:33:16 UTC

[2/2] git commit: Syntactic sugar for sum, min, and max aggregations

Syntactic sugar for sum, min, and max aggregations

This closes #7


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/9d570450
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/9d570450
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/9d570450

Branch: refs/heads/master
Commit: 9d57045020240e16ed115fab29f45b6d3f78a492
Parents: f8ec28c
Author: Kostas Tzoumas <Ko...@gmail.com>
Authored: Tue Jun 10 16:58:08 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Jun 20 15:32:20 2014 +0200

----------------------------------------------------------------------
 .../example/java/wordcount/WordCount.java       |   3 +-
 .../java/eu/stratosphere/api/java/DataSet.java  |  33 +++++
 .../api/java/operators/AggregateOperator.java   |  16 +-
 .../api/java/operators/UnsortedGrouping.java    |  33 +++++
 .../test/javaApiOperators/SumMinMaxITCase.java  | 147 +++++++++++++++++++
 5 files changed, 229 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9d570450/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCount.java b/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCount.java
index 62ec2ca..95ab92f 100644
--- a/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCount.java
+++ b/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCount.java
@@ -16,7 +16,6 @@ package eu.stratosphere.example.java.wordcount;
 
 import eu.stratosphere.api.java.DataSet;
 import eu.stratosphere.api.java.ExecutionEnvironment;
-import eu.stratosphere.api.java.aggregation.Aggregations;
 import eu.stratosphere.api.java.functions.FlatMapFunction;
 import eu.stratosphere.api.java.tuple.Tuple2;
 import eu.stratosphere.example.java.wordcount.util.WordCountData;
@@ -66,7 +65,7 @@ public class WordCount {
 				text.flatMap(new Tokenizer())
 				// group by the tuple field "0" and sum up tuple field "1"
 				.groupBy(0)
-				.aggregate(Aggregations.SUM, 1);
+				.sum(1);
 
 		// emit result
 		if(fileOutput) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9d570450/stratosphere-java/src/main/java/eu/stratosphere/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/DataSet.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/DataSet.java
index 758cbf2..0770f24 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/DataSet.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/DataSet.java
@@ -223,6 +223,39 @@ public abstract class DataSet<T> {
 	public AggregateOperator<T> aggregate(Aggregations agg, int field) {
 		return new AggregateOperator<T>(this, agg, field);
 	}
+
+	/**
+	 * Syntactic sugar for aggregate (SUM, field)
+	 * @param field The index of the Tuple field on which the aggregation function is applied.
+	 * @return An AggregateOperator that represents the summed DataSet.
+	 *
+	 * @see eu.stratosphere.api.java.operators.AggregateOperator
+	 */
+	public AggregateOperator<T> sum (int field) {
+		return this.aggregate (Aggregations.SUM, field);
+	}
+
+	/**
+	 * Syntactic sugar for aggregate (MAX, field)
+	 * @param field The index of the Tuple field on which the aggregation function is applied.
+	 * @return An AggregateOperator that represents the max'ed DataSet.
+	 *
+	 * @see eu.stratosphere.api.java.operators.AggregateOperator
+	 */
+	public AggregateOperator<T> max (int field) {
+		return this.aggregate (Aggregations.MAX, field);
+	}
+
+	/**
+	 * Syntactic sugar for aggregate (MIN, field)
+	 * @param field The index of the Tuple field on which the aggregation function is applied.
+	 * @return An AggregateOperator that represents the min'ed DataSet.
+	 *
+	 * @see eu.stratosphere.api.java.operators.AggregateOperator
+	 */
+	public AggregateOperator<T> min (int field) {
+		return this.aggregate (Aggregations.MIN, field);
+	}
 	
 	/**
 	 * Applies a Reduce transformation on a non-grouped {@link DataSet}.<br/>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9d570450/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/AggregateOperator.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/AggregateOperator.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/AggregateOperator.java
index acc0453..e68cb67 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/AggregateOperator.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/AggregateOperator.java
@@ -130,7 +130,21 @@ public class AggregateOperator<IN> extends SingleInputOperator<IN, IN, Aggregate
 
 		return this;
 	}
-	
+
+
+	public AggregateOperator<IN> andSum (int field) {
+		return this.and(Aggregations.SUM, field);
+	}
+
+	public AggregateOperator<IN> andMin (int field) {
+		return this.and(Aggregations.MIN, field);
+	}
+
+	public AggregateOperator<IN> andMax (int field) {
+		return this.and(Aggregations.MAX, field);
+	}
+
+
 	@SuppressWarnings("unchecked")
 	@Override
 	protected eu.stratosphere.api.common.operators.base.GroupReduceOperatorBase<IN, IN, GenericGroupReduce<IN, IN>> translateToDataFlow(Operator<IN> input) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9d570450/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/UnsortedGrouping.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/UnsortedGrouping.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/UnsortedGrouping.java
index 95e40bc..15df334 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/UnsortedGrouping.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/UnsortedGrouping.java
@@ -49,6 +49,39 @@ public class UnsortedGrouping<T> extends Grouping<T> {
 	public AggregateOperator<T> aggregate(Aggregations agg, int field) {
 		return new AggregateOperator<T>(this, agg, field);
 	}
+
+	/**
+	 * Syntactic sugar for aggregate (SUM, field)
+	 * @param field The index of the Tuple field on which the aggregation function is applied.
+	 * @return An AggregateOperator that represents the summed DataSet.
+	 *
+	 * @see eu.stratosphere.api.java.operators.AggregateOperator
+	 */
+	public AggregateOperator<T> sum (int field) {
+		return this.aggregate (Aggregations.SUM, field);
+	}
+
+	/**
+	 * Syntactic sugar for aggregate (MAX, field)
+	 * @param field The index of the Tuple field on which the aggregation function is applied.
+	 * @return An AggregateOperator that represents the max'ed DataSet.
+	 *
+	 * @see eu.stratosphere.api.java.operators.AggregateOperator
+	 */
+	public AggregateOperator<T> max (int field) {
+		return this.aggregate (Aggregations.MAX, field);
+	}
+
+	/**
+	 * Syntactic sugar for aggregate (MIN, field)
+	 * @param field The index of the Tuple field on which the aggregation function is applied.
+	 * @return An AggregateOperator that represents the min'ed DataSet.
+	 *
+	 * @see eu.stratosphere.api.java.operators.AggregateOperator
+	 */
+	public AggregateOperator<T> min (int field) {
+		return this.aggregate (Aggregations.MIN, field);
+	}
 	
 	/**
 	 * Applies a Reduce transformation on a grouped {@link DataSet}.<br/>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9d570450/stratosphere-tests/src/test/java/eu/stratosphere/test/javaApiOperators/SumMinMaxITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/javaApiOperators/SumMinMaxITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/javaApiOperators/SumMinMaxITCase.java
new file mode 100644
index 0000000..8b7dc80
--- /dev/null
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/javaApiOperators/SumMinMaxITCase.java
@@ -0,0 +1,147 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.test.javaApiOperators;
+
+
+import eu.stratosphere.api.java.DataSet;
+import eu.stratosphere.api.java.ExecutionEnvironment;
+import eu.stratosphere.api.java.aggregation.Aggregations;
+import eu.stratosphere.api.java.tuple.Tuple1;
+import eu.stratosphere.api.java.tuple.Tuple2;
+import eu.stratosphere.api.java.tuple.Tuple3;
+import eu.stratosphere.configuration.Configuration;
+import eu.stratosphere.test.javaApiOperators.util.CollectionDataSets;
+import eu.stratosphere.test.util.JavaProgramTestBase;
+import org.junit.runners.Parameterized;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+
+public class SumMinMaxITCase extends JavaProgramTestBase  {
+
+	private static int NUM_PROGRAMS = 3;
+
+	private int curProgId = config.getInteger("ProgramId", -1);
+	private String resultPath;
+	private String expectedResult;
+
+	public SumMinMaxITCase(Configuration config) {
+		super(config);
+	}
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		expectedResult = SumMinMaxProgs.runProgram(curProgId, resultPath);
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(expectedResult, resultPath);
+	}
+
+	@Parameterized.Parameters
+	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
+
+		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+
+		for(int i=1; i <= NUM_PROGRAMS; i++) {
+			Configuration config = new Configuration();
+			config.setInteger("ProgramId", i);
+			tConfigs.add(config);
+		}
+
+		return toParameterList(tConfigs);
+	}
+
+	/**
+	 * These tests are copied from
+	 * @see eu.stratosphere.test.javaApiOperators.AggregateITCase
+	 * replacing calls to aggregate with calls to sum, min, and max
+	 */
+	private static class SumMinMaxProgs {
+
+		public static String runProgram(int progId, String resultPath) throws Exception {
+			switch(progId) {
+				case 1: {
+					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+					DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+					DataSet<Tuple2<Integer, Long>> sumDs = ds
+							.sum(0)
+							.andMax(1)
+							.project(0, 1).types(Integer.class, Long.class);
+
+					sumDs.writeAsCsv(resultPath);
+					env.execute();
+
+					// return expected result
+					return "231,6\n";
+				}
+				case 2: {
+				/*
+				 * Grouped Aggregate
+				 */
+
+					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+					DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+					DataSet<Tuple2<Long, Integer>> aggregateDs = ds.groupBy(1)
+							.sum(0)
+							.project(1, 0).types(Long.class, Integer.class);
+
+					aggregateDs.writeAsCsv(resultPath);
+					env.execute();
+
+					// return expected result
+					return "1,1\n" +
+							"2,5\n" +
+							"3,15\n" +
+							"4,34\n" +
+							"5,65\n" +
+							"6,111\n";
+				}
+				case 3: {
+				/*
+				 * Nested Aggregate
+				 */
+
+					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+					DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+					DataSet<Tuple1<Integer>> aggregateDs = ds.groupBy(1)
+							.min(0)
+							.min(0)
+							.project(0).types(Integer.class);
+
+					aggregateDs.writeAsCsv(resultPath);
+					env.execute();
+
+					// return expected result
+					return "1\n";
+				}
+				default:
+					throw new IllegalArgumentException("Invalid program id");
+			}
+		}
+	}
+}