You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/11/23 20:03:08 UTC

[1/5] flink git commit: [hotfix] [streaming] Fix type extraction for joined streams.

Repository: flink
Updated Branches:
  refs/heads/master e8318d6f4 -> 16ee4a5ce


[hotfix] [streaming] Fix type extraction for joined streams.

This closes #2755.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/16ee4a5c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/16ee4a5c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/16ee4a5c

Branch: refs/heads/master
Commit: 16ee4a5ce717987d824593b1f9fdaafa44d98632
Parents: 3458a66
Author: Robert Metzger <rm...@apache.org>
Authored: Fri Nov 4 16:15:26 2016 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 23 18:35:44 2016 +0100

----------------------------------------------------------------------
 .../org/apache/flink/streaming/api/datastream/JoinedStreams.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/16ee4a5c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
index c005310..ed1cbd7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
@@ -306,7 +306,7 @@ public class JoinedStreams<T1, T2> {
 		public <T> DataStream<T> apply(FlatJoinFunction<T1, T2, T> function) {
 			TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(
 					function,
-					JoinFunction.class,
+					FlatJoinFunction.class,
 					true,
 					true,
 					input1.getType(),


[5/5] flink git commit: [FLINK-5143] [table] Add EXISTS to list of supported SQL operators.

Posted by fh...@apache.org.
[FLINK-5143] [table] Add EXISTS to list of supported SQL operators.

This closes #2853.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/06d252e8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/06d252e8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/06d252e8

Branch: refs/heads/master
Commit: 06d252e89b58c5947b331cf24a552d11ff8767e8
Parents: 7d91b9e
Author: twalthr <tw...@apache.org>
Authored: Wed Nov 23 10:44:20 2016 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 23 18:35:44 2016 +0100

----------------------------------------------------------------------
 docs/dev/table_api.md                           | 18 ++---
 .../table/plan/nodes/dataset/DataSetJoin.scala  |  2 +-
 .../api/table/validate/FunctionCatalog.scala    |  3 +-
 .../api/scala/batch/sql/SetOperatorsTest.scala  | 75 ++++++++++++++++++++
 .../flink/api/table/utils/TableTestBase.scala   | 26 +++++++
 5 files changed, 114 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/06d252e8/docs/dev/table_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index bb0e500..4d8c953 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -2800,39 +2800,41 @@ value NOT IN (value [, value]* )
         <p>Whether <i>value</i> is not equal to every value in a list.</p>
       </td>
     </tr>
-<!-- NOT SUPPORTED SO FAR
+
     <tr>
       <td>
         {% highlight text %}
-value IN (sub-query)
+EXISTS (sub-query)
 {% endhighlight %}
       </td>
       <td>
-        <p>Whether <i>value</i> is equal to a row returned by sub-query.</p>
+        <p>Whether <i>sub-query</i> returns at least one row. Only supported if the operation can be rewritten in a join and group operation.</p>
       </td>
     </tr>
 
+<!-- NOT SUPPORTED SO FAR
     <tr>
       <td>
         {% highlight text %}
-value NOT IN (sub-query)
+value IN (sub-query)
 {% endhighlight %}
       </td>
       <td>
-        <p>Whether <i>value</i> is not equal to every row returned by sub-query.</p>
+        <p>Whether <i>value</i> is equal to a row returned by sub-query.</p>
       </td>
     </tr>
 
     <tr>
       <td>
         {% highlight text %}
-EXISTS (sub-query)
+value NOT IN (sub-query)
 {% endhighlight %}
       </td>
       <td>
-        <p>Whether sub-query returns at least one row.</p>
+        <p>Whether <i>value</i> is not equal to every row returned by sub-query.</p>
       </td>
-    </tr>-->
+    </tr>
+    -->
 
   </tbody>
 </table>

http://git-wip-us.apache.org/repos/asf/flink/blob/06d252e8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
index bbb6325..6d7a30e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
@@ -215,7 +215,7 @@ class DataSetJoin(
   }
 
   private def joinTypeToString = joinType match {
-    case JoinRelType.INNER => "Join"
+    case JoinRelType.INNER => "InnerJoin"
     case JoinRelType.LEFT=> "LeftOuterJoin"
     case JoinRelType.RIGHT => "RightOuterJoin"
     case JoinRelType.FULL => "FullOuterJoin"

http://git-wip-us.apache.org/repos/asf/flink/blob/06d252e8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
index 68e2f97..679733c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
@@ -281,7 +281,8 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
     SqlStdOperatorTable.CAST,
     SqlStdOperatorTable.EXTRACT,
     SqlStdOperatorTable.QUARTER,
-    SqlStdOperatorTable.SCALAR_QUERY
+    SqlStdOperatorTable.SCALAR_QUERY,
+    SqlStdOperatorTable.EXISTS
   )
 
   builtInSqlOperators.foreach(register)

http://git-wip-us.apache.org/repos/asf/flink/blob/06d252e8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala
new file mode 100644
index 0000000..5bc6e4a
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.batch.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.utils.TableTestBase
+import org.apache.flink.api.table.utils.TableTestUtil._
+import org.junit.Test
+
+class SetOperatorsTest extends TableTestBase {
+
+  @Test
+  def testExists(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Long, Int, String)]("A", 'a_long, 'a_int, 'a_string)
+    util.addTable[(Long, Int, String)]("B", 'b_long, 'b_int, 'b_string)
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      binaryNode(
+        "DataSetJoin",
+        batchTableNode(0),
+        unaryNode(
+          "DataSetAggregate",
+          unaryNode(
+            "DataSetCalc",
+            binaryNode(
+              "DataSetJoin",
+              batchTableNode(1),
+              unaryNode(
+                "DataSetAggregate",
+                batchTableNode(0),
+                term("groupBy", "a_long"),
+                term("select", "a_long")
+              ),
+              term("where", "=(a_long, b_long)"),
+              term("join", "b_long", "b_int", "b_string", "a_long"),
+              term("joinType", "InnerJoin")
+            ),
+            term("select", "a_long", "true AS $f0")
+          ),
+          term("groupBy", "a_long"),
+          term("select", "a_long", "MIN($f0) AS $f1")
+        ),
+        term("where", "=(a_long, a_long0)"),
+        term("join", "a_long", "a_int", "a_string", "a_long0", "$f1"),
+        term("joinType", "InnerJoin")
+      ),
+      term("select", "a_int", "a_string")
+    )
+
+    util.verifySql(
+      "SELECT a_int, a_string FROM A WHERE EXISTS(SELECT * FROM B WHERE a_long = b_long)",
+      expected
+    )
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06d252e8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala
index ce693ff..2ea15a0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala
@@ -56,6 +56,10 @@ abstract class TableTestUtil {
   def addTable[T: TypeInformation](name: String, fields: Expression*): Table
   def verifySql(query: String, expected: String): Unit
   def verifyTable(resultTable: Table, expected: String): Unit
+
+  // the print methods are for debugging purposes only
+  def printTable(resultTable: Table): Unit
+  def printSql(query: String): Unit
 }
 
 object TableTestUtil {
@@ -87,6 +91,7 @@ object TableTestUtil {
   def streamTableNode(idx: Int): String = {
     s"DataStreamScan(table=[[_DataStreamTable_$idx]])"
   }
+
 }
 
 case class BatchTableTestUtil() extends TableTestUtil {
@@ -121,6 +126,16 @@ case class BatchTableTestUtil() extends TableTestUtil {
       expected.split("\n").map(_.trim).mkString("\n"),
       actual.split("\n").map(_.trim).mkString("\n"))
   }
+
+  def printTable(resultTable: Table): Unit = {
+    val relNode = resultTable.getRelNode
+    val optimized = tEnv.optimize(relNode)
+    println(RelOptUtil.toString(optimized))
+  }
+
+  def printSql(query: String): Unit = {
+    printTable(tEnv.sql(query))
+  }
 }
 
 case class StreamTableTestUtil() extends TableTestUtil {
@@ -156,4 +171,15 @@ case class StreamTableTestUtil() extends TableTestUtil {
       expected.split("\n").map(_.trim).mkString("\n"),
       actual.split("\n").map(_.trim).mkString("\n"))
   }
+
+  // the print methods are for debugging purposes only
+  def printTable(resultTable: Table): Unit = {
+    val relNode = resultTable.getRelNode
+    val optimized = tEnv.optimize(relNode)
+    println(RelOptUtil.toString(optimized))
+  }
+
+  def printSql(query: String): Unit = {
+    printTable(tEnv.sql(query))
+  }
 }


[3/5] flink git commit: [FLINK-2662] [optimizer] Fix computation of global properties of union operator.

Posted by fh...@apache.org.
[FLINK-2662] [optimizer] Fix computation of global properties of union operator.

- Fixes invalid shipping strategy between consecutive unions.

This closes #2848.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7d91b9ec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7d91b9ec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7d91b9ec

Branch: refs/heads/master
Commit: 7d91b9ec71c9b711e04a91f847f5c85d3f561da6
Parents: e8318d6
Author: Fabian Hueske <fh...@apache.org>
Authored: Mon Nov 21 19:06:42 2016 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 23 18:35:44 2016 +0100

----------------------------------------------------------------------
 .../operators/BinaryUnionOpDescriptor.java      |  30 ++-
 .../flink/optimizer/UnionReplacementTest.java   | 240 ++++++++++++++++++-
 2 files changed, 258 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7d91b9ec/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/BinaryUnionOpDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/BinaryUnionOpDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/BinaryUnionOpDescriptor.java
index 8cc517e..78ac3d6 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/BinaryUnionOpDescriptor.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/BinaryUnionOpDescriptor.java
@@ -69,11 +69,35 @@ public class BinaryUnionOpDescriptor extends OperatorDescriptorDual {
 		
 		if (in1.getPartitioning() == PartitioningProperty.HASH_PARTITIONED &&
 			in2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED &&
-			in1.getPartitioningFields().equals(in2.getPartitioningFields()))
-		{
+			in1.getPartitioningFields().equals(in2.getPartitioningFields())) {
 			newProps.setHashPartitioned(in1.getPartitioningFields());
 		}
-		
+		else if (in1.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED &&
+					in2.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED &&
+					in1.getPartitioningOrdering().equals(in2.getPartitioningOrdering()) &&
+					(
+						in1.getDataDistribution() == null && in2.getDataDistribution() == null ||
+						in1.getDataDistribution() != null && in1.getDataDistribution().equals(in2.getDataDistribution())
+					)
+				) {
+			if (in1.getDataDistribution() == null) {
+				newProps.setRangePartitioned(in1.getPartitioningOrdering());
+			}
+			else {
+				newProps.setRangePartitioned(in1.getPartitioningOrdering(), in1.getDataDistribution());
+			}
+		}
+		else if (in1.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING &&
+					in2.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING &&
+					in1.getPartitioningFields().equals(in2.getPartitioningFields()) &&
+					in1.getCustomPartitioner().equals(in2.getCustomPartitioner())) {
+			newProps.setCustomPartitioned(in1.getPartitioningFields(), in1.getCustomPartitioner());
+		}
+		else if (in1.getPartitioning() == PartitioningProperty.FORCED_REBALANCED &&
+					in2.getPartitioning() == PartitioningProperty.FORCED_REBALANCED) {
+			newProps.setForcedRebalanced();
+		}
+
 		return newProps;
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/7d91b9ec/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
index 3be7657..d0bb376 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
@@ -19,18 +19,25 @@
 package org.apache.flink.optimizer;
 
 import junit.framework.Assert;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
 import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SourcePlanNode;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.optimizer.util.CompilerTestBase;
+import org.apache.flink.runtime.operators.Driver;
+import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.junit.Test;
 
@@ -87,7 +94,7 @@ public class UnionReplacementTest extends CompilerTestBase {
 	 *
 	 */
 	@Test
-	public void testUnionWithTwoOutputsTest() throws Exception {
+	public void testUnionWithTwoOutputs() throws Exception {
 
 		// -----------------------------------------------------------------------------------------
 		// Build test program
@@ -120,38 +127,253 @@ public class UnionReplacementTest extends CompilerTestBase {
 		SingleInputPlanNode groupRed2 = resolver.getNode("2");
 
 		// check partitioning is correct
-		Assert.assertTrue("Reduce input should be partitioned on 0.",
+		assertTrue("Reduce input should be partitioned on 0.",
 			groupRed1.getInput().getGlobalProperties().getPartitioningFields().isExactMatch(new FieldList(0)));
-		Assert.assertTrue("Reduce input should be partitioned on 1.",
+		assertTrue("Reduce input should be partitioned on 1.",
 			groupRed2.getInput().getGlobalProperties().getPartitioningFields().isExactMatch(new FieldList(1)));
 
 		// check group reduce inputs are n-ary unions with three inputs
-		Assert.assertTrue("Reduce input should be n-ary union with three inputs.",
+		assertTrue("Reduce input should be n-ary union with three inputs.",
 			groupRed1.getInput().getSource() instanceof NAryUnionPlanNode &&
 				((NAryUnionPlanNode) groupRed1.getInput().getSource()).getListOfInputs().size() == 3);
-		Assert.assertTrue("Reduce input should be n-ary union with three inputs.",
+		assertTrue("Reduce input should be n-ary union with three inputs.",
 			groupRed2.getInput().getSource() instanceof NAryUnionPlanNode &&
 				((NAryUnionPlanNode) groupRed2.getInput().getSource()).getListOfInputs().size() == 3);
 
 		// check channel from union to group reduce is forwarding
-		Assert.assertTrue("Channel between union and group reduce should be forwarding",
+		assertTrue("Channel between union and group reduce should be forwarding",
 			groupRed1.getInput().getShipStrategy().equals(ShipStrategyType.FORWARD));
-		Assert.assertTrue("Channel between union and group reduce should be forwarding",
+		assertTrue("Channel between union and group reduce should be forwarding",
 			groupRed2.getInput().getShipStrategy().equals(ShipStrategyType.FORWARD));
 
 		// check that all inputs of unions are hash partitioned
 		List<Channel> union123In = ((NAryUnionPlanNode) groupRed1.getInput().getSource()).getListOfInputs();
 		for(Channel i : union123In) {
-			Assert.assertTrue("Union input channel should hash partition on 0",
+			assertTrue("Union input channel should hash partition on 0",
 				i.getShipStrategy().equals(ShipStrategyType.PARTITION_HASH) &&
 					i.getShipStrategyKeys().isExactMatch(new FieldList(0)));
 		}
 		List<Channel> union234In = ((NAryUnionPlanNode) groupRed2.getInput().getSource()).getListOfInputs();
 		for(Channel i : union234In) {
-			Assert.assertTrue("Union input channel should hash partition on 0",
+			assertTrue("Union input channel should hash partition on 0",
 				i.getShipStrategy().equals(ShipStrategyType.PARTITION_HASH) &&
 					i.getShipStrategyKeys().isExactMatch(new FieldList(1)));
 		}
 
 	}
+
+	/**
+	 *
+	 * Checks that a plan with consecutive UNIONs followed by PartitionByHash is correctly translated.
+	 *
+	 * The program can be illustrated as follows:
+	 *
+	 * Src1 -\
+	 *        >-> Union12--<
+	 * Src2 -/              \
+	 *                       >-> Union123 -> PartitionByHash -> Output
+	 * Src3 ----------------/
+	 *
+	 * In the resulting plan, the hash partitioning (ShippingStrategy.PARTITION_HASH) must be
+	 * pushed to the inputs of the unions (Src1, Src2, Src3).
+	 *
+	 */
+	@Test
+	public void testConsecutiveUnionsWithHashPartitioning() throws Exception {
+
+		// -----------------------------------------------------------------------------------------
+		// Build test program
+		// -----------------------------------------------------------------------------------------
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
+
+		DataSet<Tuple2<Long, Long>> src1 = env.fromElements(new Tuple2<>(0L, 0L));
+		DataSet<Tuple2<Long, Long>> src2 = env.fromElements(new Tuple2<>(0L, 0L));
+		DataSet<Tuple2<Long, Long>> src3 = env.fromElements(new Tuple2<>(0L, 0L));
+
+		DataSet<Tuple2<Long, Long>> union12 = src1.union(src2);
+		DataSet<Tuple2<Long, Long>> union123 = union12.union(src3);
+
+		union123.partitionByHash(1).output(new DiscardingOutputFormat<Tuple2<Long, Long>>()).name("out");
+
+		// -----------------------------------------------------------------------------------------
+		// Verify optimized plan
+		// -----------------------------------------------------------------------------------------
+
+		OptimizedPlan optimizedPlan = compileNoStats(env.createProgramPlan());
+
+		OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(optimizedPlan);
+
+		SingleInputPlanNode sink = resolver.getNode("out");
+
+		// check partitioning is correct
+		assertEquals("Sink input should be hash partitioned.",
+			PartitioningProperty.HASH_PARTITIONED, sink.getInput().getGlobalProperties().getPartitioning());
+		assertEquals("Sink input should be hash partitioned on 1.",
+			new FieldList(1), sink.getInput().getGlobalProperties().getPartitioningFields());
+
+		SingleInputPlanNode partitioner = (SingleInputPlanNode)sink.getInput().getSource();
+		assertTrue(partitioner.getDriverStrategy() == DriverStrategy.UNARY_NO_OP);
+		assertEquals("Partitioner input should be hash partitioned.",
+			PartitioningProperty.HASH_PARTITIONED, partitioner.getInput().getGlobalProperties().getPartitioning());
+		assertEquals("Partitioner input should be hash partitioned on 1.",
+			new FieldList(1), partitioner.getInput().getGlobalProperties().getPartitioningFields());
+		assertEquals("Partitioner input channel should be forwarding",
+			ShipStrategyType.FORWARD, partitioner.getInput().getShipStrategy());
+
+		NAryUnionPlanNode union = (NAryUnionPlanNode)partitioner.getInput().getSource();
+		// all union inputs should be hash partitioned
+		for (Channel c : union.getInputs()) {
+			assertEquals("Union input should be hash partitioned",
+				PartitioningProperty.HASH_PARTITIONED, c.getGlobalProperties().getPartitioning());
+			assertEquals("Union input channel should be hash partitioning",
+				ShipStrategyType.PARTITION_HASH, c.getShipStrategy());
+			assertTrue("Union input should be data source",
+				c.getSource() instanceof SourcePlanNode);
+		}
+	}
+
+	/**
+	 *
+	 * Checks that a plan with consecutive UNIONs followed by REBALANCE is correctly translated.
+	 *
+	 * The program can be illustrated as follows:
+	 *
+	 * Src1 -\
+	 *        >-> Union12--<
+	 * Src2 -/              \
+	 *                       >-> Union123 -> Rebalance -> Output
+	 * Src3 ----------------/
+	 *
+	 * In the resulting plan, the Rebalance (ShippingStrategy.PARTITION_FORCED_REBALANCE) must be
+	 * pushed to the inputs of the unions (Src1, Src2, Src3).
+	 *
+	 */
+	@Test
+	public void testConsecutiveUnionsWithRebalance() throws Exception {
+
+		// -----------------------------------------------------------------------------------------
+		// Build test program
+		// -----------------------------------------------------------------------------------------
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
+
+		DataSet<Tuple2<Long, Long>> src1 = env.fromElements(new Tuple2<>(0L, 0L));
+		DataSet<Tuple2<Long, Long>> src2 = env.fromElements(new Tuple2<>(0L, 0L));
+		DataSet<Tuple2<Long, Long>> src3 = env.fromElements(new Tuple2<>(0L, 0L));
+
+		DataSet<Tuple2<Long, Long>> union12 = src1.union(src2);
+		DataSet<Tuple2<Long, Long>> union123 = union12.union(src3);
+
+		union123.rebalance().output(new DiscardingOutputFormat<Tuple2<Long, Long>>()).name("out");
+
+		// -----------------------------------------------------------------------------------------
+		// Verify optimized plan
+		// -----------------------------------------------------------------------------------------
+
+		OptimizedPlan optimizedPlan = compileNoStats(env.createProgramPlan());
+
+		OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(optimizedPlan);
+
+		SingleInputPlanNode sink = resolver.getNode("out");
+
+		// check partitioning is correct
+		assertEquals("Sink input should be force rebalanced.",
+			PartitioningProperty.FORCED_REBALANCED, sink.getInput().getGlobalProperties().getPartitioning());
+
+		SingleInputPlanNode partitioner = (SingleInputPlanNode)sink.getInput().getSource();
+		assertTrue(partitioner.getDriverStrategy() == DriverStrategy.UNARY_NO_OP);
+		assertEquals("Partitioner input should be force rebalanced.",
+			PartitioningProperty.FORCED_REBALANCED, partitioner.getInput().getGlobalProperties().getPartitioning());
+		assertEquals("Partitioner input channel should be forwarding",
+			ShipStrategyType.FORWARD, partitioner.getInput().getShipStrategy());
+
+		NAryUnionPlanNode union = (NAryUnionPlanNode)partitioner.getInput().getSource();
+		// all union inputs should be force rebalanced
+		for (Channel c : union.getInputs()) {
+			assertEquals("Union input should be force rebalanced",
+				PartitioningProperty.FORCED_REBALANCED, c.getGlobalProperties().getPartitioning());
+			assertEquals("Union input channel should be rebalancing",
+				ShipStrategyType.PARTITION_FORCED_REBALANCE, c.getShipStrategy());
+			assertTrue("Union input should be data source",
+				c.getSource() instanceof SourcePlanNode);
+		}
+	}
+
+	/**
+	 *
+	 * Checks that a plan with consecutive UNIONs followed by PARTITION_RANGE is correctly translated.
+	 *
+	 * The program can be illustrated as follows:
+	 *
+	 * Src1 -\
+	 *        >-> Union12--<
+	 * Src2 -/              \
+	 *                       >-> Union123 -> PartitionByRange -> Output
+	 * Src3 ----------------/
+	 *
+	 * In the resulting plan, the range partitioning must be
+	 * pushed to the inputs of the unions (Src1, Src2, Src3).
+	 *
+	 */
+	@Test
+	public void testConsecutiveUnionsWithRangePartitioning() throws Exception {
+
+		// -----------------------------------------------------------------------------------------
+		// Build test program
+		// -----------------------------------------------------------------------------------------
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
+
+		DataSet<Tuple2<Long, Long>> src1 = env.fromElements(new Tuple2<>(0L, 0L));
+		DataSet<Tuple2<Long, Long>> src2 = env.fromElements(new Tuple2<>(0L, 0L));
+		DataSet<Tuple2<Long, Long>> src3 = env.fromElements(new Tuple2<>(0L, 0L));
+
+		DataSet<Tuple2<Long, Long>> union12 = src1.union(src2);
+		DataSet<Tuple2<Long, Long>> union123 = union12.union(src3);
+
+		union123.partitionByRange(1).output(new DiscardingOutputFormat<Tuple2<Long, Long>>()).name("out");
+
+		// -----------------------------------------------------------------------------------------
+		// Verify optimized plan
+		// -----------------------------------------------------------------------------------------
+
+		OptimizedPlan optimizedPlan = compileNoStats(env.createProgramPlan());
+
+		OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(optimizedPlan);
+
+		SingleInputPlanNode sink = resolver.getNode("out");
+
+		// check partitioning is correct
+		assertEquals("Sink input should be range partitioned.",
+			PartitioningProperty.RANGE_PARTITIONED, sink.getInput().getGlobalProperties().getPartitioning());
+		assertEquals("Sink input should be range partitioned on 1",
+			new Ordering(1, null, Order.ASCENDING), sink.getInput().getGlobalProperties().getPartitioningOrdering());
+
+		SingleInputPlanNode partitioner = (SingleInputPlanNode)sink.getInput().getSource();
+		assertTrue(partitioner.getDriverStrategy() == DriverStrategy.UNARY_NO_OP);
+		assertEquals("Partitioner input should be range partitioned.",
+			PartitioningProperty.RANGE_PARTITIONED, partitioner.getInput().getGlobalProperties().getPartitioning());
+		assertEquals("Partitioner input should be range partitioned on 1",
+			new Ordering(1, null, Order.ASCENDING), partitioner.getInput().getGlobalProperties().getPartitioningOrdering());
+		assertEquals("Partitioner input channel should be forwarding",
+			ShipStrategyType.FORWARD, partitioner.getInput().getShipStrategy());
+
+		NAryUnionPlanNode union = (NAryUnionPlanNode)partitioner.getInput().getSource();
+		// all union inputs should be force rebalanced
+		for (Channel c : union.getInputs()) {
+			assertEquals("Union input should be force rebalanced",
+				PartitioningProperty.RANGE_PARTITIONED, c.getGlobalProperties().getPartitioning());
+			assertEquals("Union input channel should be rebalancing",
+				ShipStrategyType.FORWARD, c.getShipStrategy());
+			// range partitioning is executed as custom partitioning with prior sampling
+			SingleInputPlanNode partitionMap = (SingleInputPlanNode)c.getSource();
+			assertEquals(DriverStrategy.MAP, partitionMap.getDriverStrategy());
+			assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitionMap.getInput().getShipStrategy());
+		}
+	}
+
 }


[2/5] flink git commit: [FLINK-4937] [table] Add incremental group window aggregation for streaming Table API.

Posted by fh...@apache.org.
[FLINK-4937] [table] Add incremental group window aggregation for streaming Table API.

This closes #2792.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/74e0971a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/74e0971a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/74e0971a

Branch: refs/heads/master
Commit: 74e0971a5511c511feb94bee7a0ce39eb9951b62
Parents: 06d252e
Author: Jincheng Sun <su...@gmail.com>
Authored: Sat Nov 12 18:55:07 2016 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 23 18:35:44 2016 +0100

----------------------------------------------------------------------
 .../plan/nodes/dataset/DataSetAggregate.scala   |  18 +-
 .../nodes/datastream/DataStreamAggregate.scala  | 228 ++++++-----
 .../AggregateAllTimeWindowFunction.scala        |   7 +-
 .../aggregate/AggregateAllWindowFunction.scala  |   7 +-
 .../aggregate/AggregateMapFunction.scala        |   3 +-
 .../AggregateReduceCombineFunction.scala        |  54 +--
 .../AggregateReduceGroupFunction.scala          |   4 +-
 .../aggregate/AggregateTimeWindowFunction.scala |  14 +-
 .../table/runtime/aggregate/AggregateUtil.scala | 392 ++++++++++++++++---
 .../aggregate/AggregateWindowFunction.scala     |  12 +-
 ...rementalAggregateAllTimeWindowFunction.scala |  68 ++++
 .../IncrementalAggregateAllWindowFunction.scala |  79 ++++
 .../IncrementalAggregateReduceFunction.scala    |  63 +++
 ...IncrementalAggregateTimeWindowFunction.scala |  69 ++++
 .../IncrementalAggregateWindowFunction.scala    |  81 ++++
 .../scala/stream/table/AggregationsITCase.scala |  14 +-
 16 files changed, 848 insertions(+), 265 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/74e0971a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
index c73d781..e85ade0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
@@ -85,17 +85,22 @@ class DataSetAggregate(
   }
 
   override def translateToPlan(
-      tableEnv: BatchTableEnvironment,
-      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+    tableEnv: BatchTableEnvironment,
+    expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
 
     val config = tableEnv.getConfig
 
     val groupingKeys = grouping.indices.toArray
-    // add grouping fields, position keys in the input, and input type
-    val aggregateResult = AggregateUtil.createOperatorFunctionsForAggregates(
+
+    val mapFunction = AggregateUtil.createPrepareMapFunction(
+      namedAggregates,
+      grouping,
+      inputType)
+
+    val groupReduceFunction = AggregateUtil.createAggregateGroupReduceFunction(
       namedAggregates,
       inputType,
-      getRowType,
+      rowRelDataType,
       grouping)
 
     val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(
@@ -111,10 +116,9 @@ class DataSetAggregate(
     val aggString = aggregationToString(inputType, grouping, getRowType, namedAggregates, Nil)
     val prepareOpName = s"prepare select: ($aggString)"
     val mappedInput = inputDS
-      .map(aggregateResult._1)
+      .map(mapFunction)
       .name(prepareOpName)
 
-    val groupReduceFunction = aggregateResult._2
     val rowTypeInfo = new RowTypeInfo(fieldTypes)
 
     val result = {

http://git-wip-us.apache.org/repos/asf/flink/blob/74e0971a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
index b4ae3ab..c7d5131 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
@@ -22,7 +22,6 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.AggregateCall
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
-import org.apache.flink.api.common.functions.RichGroupReduceFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.api.table.FlinkRelBuilder.NamedWindowProperty
@@ -31,12 +30,11 @@ import org.apache.flink.api.table.plan.logical._
 import org.apache.flink.api.table.plan.nodes.FlinkAggregate
 import org.apache.flink.api.table.plan.nodes.datastream.DataStreamAggregate._
 import org.apache.flink.api.table.runtime.aggregate.AggregateUtil._
-import org.apache.flink.api.table.runtime.aggregate._
+import org.apache.flink.api.table.runtime.aggregate.{Aggregate, _}
 import org.apache.flink.api.table.typeutils.TypeCheckUtils.isTimeInterval
 import org.apache.flink.api.table.typeutils.{RowIntervalTypeInfo, RowTypeInfo, TimeIntervalTypeInfo, TypeConverter}
-import org.apache.flink.api.table.{TableException, FlinkTypeFactory, Row, StreamTableEnvironment}
+import org.apache.flink.api.table.{FlinkTypeFactory, Row, StreamTableEnvironment}
 import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream}
-import org.apache.flink.streaming.api.functions.windowing.{WindowFunction, AllWindowFunction}
 import org.apache.flink.streaming.api.windowing.assigners._
 import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
@@ -103,30 +101,24 @@ class DataStreamAggregate(
   }
 
   override def translateToPlan(
-      tableEnv: StreamTableEnvironment,
-      expectedType: Option[TypeInformation[Any]])
-    : DataStream[Any] = {
-    
-    val config = tableEnv.getConfig
+    tableEnv: StreamTableEnvironment,
+    expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
 
+    val config = tableEnv.getConfig
     val groupingKeys = grouping.indices.toArray
-    // add grouping fields, position keys in the input, and input type
-    val aggregateResult = AggregateUtil.createOperatorFunctionsForAggregates(
-      namedAggregates,
-      inputType,
-      getRowType,
-      grouping)
-
     val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(
       tableEnv,
       // tell the input operator that this operator currently only supports Rows as input
       Some(TypeConverter.DEFAULT_ROW_TYPE))
 
     // get the output types
-    val fieldTypes: Array[TypeInformation[_]] = getRowType.getFieldList.asScala
+    val fieldTypes: Array[TypeInformation[_]] =
+      getRowType.getFieldList.asScala
       .map(field => FlinkTypeFactory.toTypeInfo(field.getType))
       .toArray
 
+    val rowTypeInfo = new RowTypeInfo(fieldTypes)
+
     val aggString = aggregationToString(
       inputType,
       grouping,
@@ -135,50 +127,118 @@ class DataStreamAggregate(
       namedProperties)
 
     val prepareOpName = s"prepare select: ($aggString)"
-    val mappedInput = inputDS
-      .map(aggregateResult._1)
-      .name(prepareOpName)
-
-    val groupReduceFunction = aggregateResult._2
-    val rowTypeInfo = new RowTypeInfo(fieldTypes)
+    val keyedAggOpName = s"groupBy: (${groupingToString(inputType, grouping)}), " +
+      s"window: ($window), " +
+      s"select: ($aggString)"
+    val nonKeyedAggOpName = s"window: ($window), select: ($aggString)"
 
-    val result = {
-      // grouped / keyed aggregation
-      if (groupingKeys.length > 0) {
-        val aggOpName = s"groupBy: (${groupingToString(inputType, grouping)}), " +
-          s"window: ($window), " +
-          s"select: ($aggString)"
-        val aggregateFunction =
-          createWindowAggregationFunction(window, namedProperties, groupReduceFunction)
-
-        val keyedStream = mappedInput.keyBy(groupingKeys: _*)
-
-        val windowedStream = createKeyedWindowedStream(window, keyedStream)
-          .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
-
-        windowedStream
-          .apply(aggregateFunction)
+    val mapFunction = AggregateUtil.createPrepareMapFunction(
+      namedAggregates,
+      grouping,
+      inputType)
+
+    val mappedInput = inputDS.map(mapFunction).name(prepareOpName)
+
+    val result: DataStream[Any] = {
+      // check whether all aggregates support partial aggregate
+      if (AggregateUtil.doAllSupportPartialAggregation(
+            namedAggregates.map(_.getKey),
+            inputType,
+            grouping.length)) {
+        // do Incremental Aggregation
+        val reduceFunction = AggregateUtil.createIncrementalAggregateReduceFunction(
+          namedAggregates,
+          inputType,
+          getRowType,
+          grouping)
+        // grouped / keyed aggregation
+        if (groupingKeys.length > 0) {
+          val windowFunction = AggregateUtil.createWindowIncrementalAggregationFunction(
+            window,
+            namedAggregates,
+            inputType,
+            rowRelDataType,
+            grouping,
+            namedProperties)
+
+          val keyedStream = mappedInput.keyBy(groupingKeys: _*)
+          val windowedStream =
+            createKeyedWindowedStream(window, keyedStream)
+            .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
+
+          windowedStream
+          .apply(reduceFunction, windowFunction)
+          .returns(rowTypeInfo)
+          .name(keyedAggOpName)
+          .asInstanceOf[DataStream[Any]]
+        }
+        // global / non-keyed aggregation
+        else {
+          val windowFunction = AggregateUtil.createAllWindowIncrementalAggregationFunction(
+            window,
+            namedAggregates,
+            inputType,
+            rowRelDataType,
+            grouping,
+            namedProperties)
+
+          val windowedStream =
+            createNonKeyedWindowedStream(window, mappedInput)
+            .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
+
+          windowedStream
+          .apply(reduceFunction, windowFunction)
           .returns(rowTypeInfo)
-          .name(aggOpName)
+          .name(nonKeyedAggOpName)
           .asInstanceOf[DataStream[Any]]
+        }
       }
-      // global / non-keyed aggregation
       else {
-        val aggOpName = s"window: ($window), select: ($aggString)"
-        val aggregateFunction =
-          createAllWindowAggregationFunction(window, namedProperties, groupReduceFunction)
-
-        val windowedStream = createNonKeyedWindowedStream(window, mappedInput)
-          .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
-
-        windowedStream
-          .apply(aggregateFunction)
+        // do non-Incremental Aggregation
+        // grouped / keyed aggregation
+        if (groupingKeys.length > 0) {
+
+          val windowFunction = AggregateUtil.createWindowAggregationFunction(
+            window,
+            namedAggregates,
+            inputType,
+            rowRelDataType,
+            grouping,
+            namedProperties)
+
+          val keyedStream = mappedInput.keyBy(groupingKeys: _*)
+          val windowedStream =
+            createKeyedWindowedStream(window, keyedStream)
+            .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
+
+          windowedStream
+          .apply(windowFunction)
           .returns(rowTypeInfo)
-          .name(aggOpName)
+          .name(keyedAggOpName)
           .asInstanceOf[DataStream[Any]]
+        }
+        // global / non-keyed aggregation
+        else {
+          val windowFunction = AggregateUtil.createAllWindowAggregationFunction(
+            window,
+            namedAggregates,
+            inputType,
+            rowRelDataType,
+            grouping,
+            namedProperties)
+
+          val windowedStream =
+            createNonKeyedWindowedStream(window, mappedInput)
+            .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
+
+          windowedStream
+          .apply(windowFunction)
+          .returns(rowTypeInfo)
+          .name(nonKeyedAggOpName)
+          .asInstanceOf[DataStream[Any]]
+        }
       }
     }
-
     // if the expected type is not a Row, inject a mapper to convert to the expected type
     expectedType match {
       case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
@@ -196,72 +256,8 @@ class DataStreamAggregate(
     }
   }
 }
-
 object DataStreamAggregate {
 
-  private def createAllWindowAggregationFunction(
-      window: LogicalWindow,
-      properties: Seq[NamedWindowProperty],
-      aggFunction: RichGroupReduceFunction[Row, Row])
-    : AllWindowFunction[Row, Row, DataStreamWindow] = {
-
-    if (isTimeWindow(window)) {
-      val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
-      new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos)
-        .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
-    } else {
-      new AggregateAllWindowFunction(aggFunction)
-    }
-
-  }
-
-  private def createWindowAggregationFunction(
-      window: LogicalWindow,
-      properties: Seq[NamedWindowProperty],
-      aggFunction: RichGroupReduceFunction[Row, Row])
-    : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
-
-    if (isTimeWindow(window)) {
-      val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
-      new AggregateTimeWindowFunction(aggFunction, startPos, endPos)
-        .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
-    } else {
-      new AggregateWindowFunction(aggFunction)
-    }
-
-  }
-
-  private def isTimeWindow(window: LogicalWindow) = {
-    window match {
-      case ProcessingTimeTumblingGroupWindow(_, size) => isTimeInterval(size.resultType)
-      case ProcessingTimeSlidingGroupWindow(_, size, _) => isTimeInterval(size.resultType)
-      case ProcessingTimeSessionGroupWindow(_, _) => true
-      case EventTimeTumblingGroupWindow(_, _, size) => isTimeInterval(size.resultType)
-      case EventTimeSlidingGroupWindow(_, _, size, _) => isTimeInterval(size.resultType)
-      case EventTimeSessionGroupWindow(_, _, _) => true
-    }
-  }
-
-  def computeWindowStartEndPropertyPos(properties: Seq[NamedWindowProperty])
-      : (Option[Int], Option[Int]) = {
-
-    val propPos = properties.foldRight((None: Option[Int], None: Option[Int], 0)) {
-      (p, x) => p match {
-        case NamedWindowProperty(name, prop) =>
-          prop match {
-            case WindowStart(_) if x._1.isDefined =>
-              throw new TableException("Duplicate WindowStart property encountered. This is a bug.")
-            case WindowStart(_) =>
-              (Some(x._3), x._2, x._3 - 1)
-            case WindowEnd(_) if x._2.isDefined =>
-              throw new TableException("Duplicate WindowEnd property encountered. This is a bug.")
-            case WindowEnd(_) =>
-              (x._1, Some(x._3), x._3 - 1)
-          }
-      }
-    }
-    (propPos._1, propPos._2)
-  }
 
   private def createKeyedWindowedStream(groupWindow: LogicalWindow, stream: KeyedStream[Row, Tuple])
     : WindowedStream[Row, Tuple, _ <: DataStreamWindow] = groupWindow match {

http://git-wip-us.apache.org/repos/asf/flink/blob/74e0971a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala
index ceadfe7..7ace2c5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala
@@ -31,14 +31,13 @@ class AggregateAllTimeWindowFunction(
     groupReduceFunction: RichGroupReduceFunction[Row, Row],
     windowStartPos: Option[Int],
     windowEndPos: Option[Int])
-
-  extends RichAllWindowFunction[Row, Row, TimeWindow] {
+  extends AggregateAllWindowFunction[TimeWindow](groupReduceFunction) {
 
   private var collector: TimeWindowPropertyCollector = _
 
   override def open(parameters: Configuration): Unit = {
-    groupReduceFunction.open(parameters)
     collector = new TimeWindowPropertyCollector(windowStartPos, windowEndPos)
+    super.open(parameters)
   }
 
   override def apply(window: TimeWindow, input: Iterable[Row], out: Collector[Row]): Unit = {
@@ -48,6 +47,6 @@ class AggregateAllTimeWindowFunction(
     collector.timeWindow = window
 
     // call wrapped reduce function with property collector
-    groupReduceFunction.reduce(input, collector)
+    super.apply(window, input, collector)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/74e0971a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala
index 53ab948..4b045be 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala
@@ -27,14 +27,15 @@ import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction
 import org.apache.flink.streaming.api.windowing.windows.Window
 import org.apache.flink.util.Collector
 
-class AggregateAllWindowFunction(groupReduceFunction: RichGroupReduceFunction[Row, Row])
-    extends RichAllWindowFunction[Row, Row, Window] {
+class AggregateAllWindowFunction[W <: Window](
+    groupReduceFunction: RichGroupReduceFunction[Row, Row])
+  extends RichAllWindowFunction[Row, Row, W] {
 
   override def open(parameters: Configuration): Unit = {
     groupReduceFunction.open(parameters)
   }
 
-  override def apply(window: Window, input: Iterable[Row], out: Collector[Row]): Unit = {
+  override def apply(window: W, input: Iterable[Row], out: Collector[Row]): Unit = {
     groupReduceFunction.reduce(input, out)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/74e0971a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala
index d848d21..7559cec 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala
@@ -29,7 +29,8 @@ class AggregateMapFunction[IN, OUT](
     private val aggFields: Array[Int],
     private val groupingKeys: Array[Int],
     @transient private val returnType: TypeInformation[OUT])
-    extends RichMapFunction[IN, OUT] with ResultTypeQueryable[OUT] {
+  extends RichMapFunction[IN, OUT]
+  with ResultTypeQueryable[OUT] {
   
   private var output: Row = _
   

http://git-wip-us.apache.org/repos/asf/flink/blob/74e0971a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala
index ca074cc..ebf0ca7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala
@@ -45,53 +45,13 @@ class AggregateReduceCombineFunction(
     private val aggregateMapping: Array[(Int, Int)],
     private val intermediateRowArity: Int,
     private val finalRowArity: Int)
-    extends RichGroupReduceFunction[Row, Row] with CombineFunction[Row, Row] {
-
-  private var aggregateBuffer: Row = _
-  private var output: Row = _
-
-  override def open(config: Configuration): Unit = {
-    Preconditions.checkNotNull(aggregates)
-    Preconditions.checkNotNull(groupKeysMapping)
-    aggregateBuffer = new Row(intermediateRowArity)
-    output = new Row(finalRowArity)
-  }
-
-  /**
-   * For grouped intermediate aggregate Rows, merge all of them into aggregate buffer,
-   * calculate aggregated values output by aggregate buffer, and set them into output
-   * Row based on the mapping relation between intermediate aggregate Row and output Row.
-   *
-   * @param records  Grouped intermediate aggregate Rows iterator.
-   * @param out The collector to hand results to.
-   *
-   */
-  override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
-
-    // Initiate intermediate aggregate value.
-    aggregates.foreach(_.initiate(aggregateBuffer))
-
-    // Merge intermediate aggregate value to buffer.
-    var last: Row = null
-    records.foreach((record) => {
-      aggregates.foreach(_.merge(record, aggregateBuffer))
-      last = record
-    })
-
-    // Set group keys value to final output.
-    groupKeysMapping.foreach {
-      case (after, previous) =>
-        output.setField(after, last.productElement(previous))
-    }
-
-    // Evaluate final aggregate value and set to output.
-    aggregateMapping.foreach {
-      case (after, previous) =>
-        output.setField(after, aggregates(previous).evaluate(aggregateBuffer))
-    }
-
-    out.collect(output)
-  }
+  extends AggregateReduceGroupFunction(
+    aggregates,
+    groupKeysMapping,
+    aggregateMapping,
+    intermediateRowArity,
+    finalRowArity)
+  with CombineFunction[Row, Row] {
 
   /**
    * For sub-grouped intermediate aggregate Rows, merge all of them into aggregate buffer,

http://git-wip-us.apache.org/repos/asf/flink/blob/74e0971a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala
index d81f3a1..8f096cc 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala
@@ -42,9 +42,9 @@ class AggregateReduceGroupFunction(
     private val aggregateMapping: Array[(Int, Int)],
     private val intermediateRowArity: Int,
     private val finalRowArity: Int)
-    extends RichGroupReduceFunction[Row, Row] {
+  extends RichGroupReduceFunction[Row, Row] {
 
-  private var aggregateBuffer: Row = _
+  protected var aggregateBuffer: Row = _
   private var output: Row = _
 
   override def open(config: Configuration) {

http://git-wip-us.apache.org/repos/asf/flink/blob/74e0971a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala
index 80f52ca..9b7ea0b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala
@@ -32,26 +32,26 @@ class AggregateTimeWindowFunction(
     groupReduceFunction: RichGroupReduceFunction[Row, Row],
     windowStartPos: Option[Int],
     windowEndPos: Option[Int])
-  extends RichWindowFunction[Row, Row, Tuple, TimeWindow] {
+  extends AggregateWindowFunction[TimeWindow](groupReduceFunction) {
 
   private var collector: TimeWindowPropertyCollector = _
 
   override def open(parameters: Configuration): Unit = {
-    groupReduceFunction.open(parameters)
     collector = new TimeWindowPropertyCollector(windowStartPos, windowEndPos)
+    super.open(parameters)
   }
 
   override def apply(
-      key: Tuple,
-      window: TimeWindow,
-      input: Iterable[Row],
-      out: Collector[Row]) : Unit = {
+    key: Tuple,
+    window: TimeWindow,
+    input: Iterable[Row],
+    out: Collector[Row]): Unit = {
 
     // set collector and window
     collector.wrappedCollector = out
     collector.timeWindow = window
 
     // call wrapped reduce function with property collector
-    groupReduceFunction.reduce(input, collector)
+    super.apply(key, window, input, collector)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/74e0971a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
index 903cc07..4428963 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
@@ -27,9 +27,15 @@ import org.apache.calcite.sql.`type`.{SqlTypeFactoryImpl, SqlTypeName}
 import org.apache.calcite.sql.fun._
 import org.apache.flink.api.common.functions.{MapFunction, RichGroupReduceFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.api.table.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.api.table.expressions.{WindowEnd, WindowStart}
+import org.apache.flink.api.table.plan.logical._
 import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.table.typeutils.TypeCheckUtils._
 import org.apache.flink.api.table.{FlinkTypeFactory, Row, TableException}
-
+import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, WindowFunction}
+import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
 import scala.collection.JavaConversions._
 import scala.collection.mutable.ArrayBuffer
 
@@ -39,66 +45,76 @@ object AggregateUtil {
   type JavaList[T] = java.util.List[T]
 
   /**
-   * Create Flink operator functions for aggregates. It includes 2 implementations of Flink 
-   * operator functions:
-   * [[org.apache.flink.api.common.functions.MapFunction]] and 
-   * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's partial aggregate,
-   * should also implement [[org.apache.flink.api.common.functions.CombineFunction]] as well). 
-   * The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the 
-   * intermediate aggregate values of all aggregate function, it's stored in Row by the following
-   * format:
-   *
-   * {{{
-   *                   avg(x) aggOffsetInRow = 2          count(z) aggOffsetInRow = 5
-   *                             |                          |
-   *                             v                          v
-   *        +---------+---------+--------+--------+--------+--------+
-   *        |groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
-   *        +---------+---------+--------+--------+--------+--------+
-   *                                              ^
-   *                                              |
-   *                               sum(y) aggOffsetInRow = 4
-   * }}}
-   *
-   */
-  def createOperatorFunctionsForAggregates(
-      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-      inputType: RelDataType,
-      outputType: RelDataType,
-      groupings: Array[Int])
-    : (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = {
-
-    val aggregateFunctionsAndFieldIndexes =
-      transformToAggregateFunctions(namedAggregates.map(_.getKey), inputType, groupings.length)
-    // store the aggregate fields of each aggregate function, by the same order of aggregates.
-    val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1
-    val aggregates = aggregateFunctionsAndFieldIndexes._2
+    * Create a [[org.apache.flink.api.common.functions.MapFunction]] that prepares for aggregates.
+    * The function returns intermediate aggregate values of all aggregate function which are
+    * organized by the following format:
+    *
+    * {{{
+    *                   avg(x) aggOffsetInRow = 2          count(z) aggOffsetInRow = 5
+    *                             |                          |
+    *                             v                          v
+    *        +---------+---------+--------+--------+--------+--------+
+    *        |groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
+    *        +---------+---------+--------+--------+--------+--------+
+    *                                              ^
+    *                                              |
+    *                               sum(y) aggOffsetInRow = 4
+    * }}}
+    *
+    */
+  private[flink] def createPrepareMapFunction(
+    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+    groupings: Array[Int],
+    inputType: RelDataType): MapFunction[Any, Row] = {
+
+    val (aggFieldIndexes,aggregates) = transformToAggregateFunctions(
+      namedAggregates.map(_.getKey),
+      inputType,
+      groupings.length)
 
     val mapReturnType: RowTypeInfo =
       createAggregateBufferDataType(groupings, aggregates, inputType)
 
     val mapFunction = new AggregateMapFunction[Row, Row](
-        aggregates, aggFieldIndexes, groupings,
-        mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]]
-
-    // the mapping relation between field index of intermediate aggregate Row and output Row.
-    val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings)
-
-    // the mapping relation between aggregate function index in list and its corresponding
-    // field index in output Row.
-    val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
-
-    if (groupingOffsetMapping.length != groupings.length ||
-        aggOffsetMapping.length != namedAggregates.length) {
-      throw new TableException("Could not find output field in input data type " +
-          "or aggregate functions.")
-    }
-
-    val allPartialAggregate = aggregates.map(_.supportPartial).forall(x => x)
+      aggregates,
+      aggFieldIndexes,
+      groupings,
+      mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]]
 
-    val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum
+    mapFunction
+  }
 
-    val reduceGroupFunction =
+  /**
+    * Create a [[org.apache.flink.api.common.functions.GroupReduceFunction]] to compute aggregates.
+    * If all aggregates support partial aggregation, the
+    * [[org.apache.flink.api.common.functions.GroupReduceFunction]] implements
+    * [[org.apache.flink.api.common.functions.CombineFunction]] as well.
+    *
+    */
+  private[flink] def createAggregateGroupReduceFunction(
+    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+    inputType: RelDataType,
+    outputType: RelDataType,
+    groupings: Array[Int]): RichGroupReduceFunction[Row, Row] = {
+
+    val aggregates = transformToAggregateFunctions(
+      namedAggregates.map(_.getKey),
+      inputType,
+      groupings.length)._2
+
+    val (groupingOffsetMapping, aggOffsetMapping) =
+      getGroupingOffsetAndAggOffsetMapping(
+        namedAggregates,
+        inputType,
+        outputType,
+        groupings)
+
+    val allPartialAggregate: Boolean = aggregates.forall(_.supportPartial)
+
+    val intermediateRowArity = groupings.length +
+      aggregates.map(_.intermediateDataType.length).sum
+
+    val groupReduceFunction =
       if (allPartialAggregate) {
         new AggregateReduceCombineFunction(
           aggregates,
@@ -115,14 +131,257 @@ object AggregateUtil {
           intermediateRowArity,
           outputType.getFieldCount)
       }
+    groupReduceFunction
+  }
+
+  /**
+    * Create a [[org.apache.flink.api.common.functions.ReduceFunction]] for incremental window
+    * aggregation.
+    *
+    */
+  private[flink] def createIncrementalAggregateReduceFunction(
+    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+    inputType: RelDataType,
+    outputType: RelDataType,
+    groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+
+    val aggregates = transformToAggregateFunctions(
+      namedAggregates.map(_.getKey),inputType,groupings.length)._2
+
+    val groupingOffsetMapping =
+      getGroupingOffsetAndAggOffsetMapping(
+        namedAggregates,
+        inputType,
+        outputType,
+        groupings)._1
+
+    val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum
+    val reduceFunction = new IncrementalAggregateReduceFunction(
+      aggregates,
+      groupingOffsetMapping,
+      intermediateRowArity)
+    reduceFunction
+  }
+
+  /**
+    * Create an [[AllWindowFunction]] to compute non-partitioned group window aggregates.
+    */
+  private[flink] def createAllWindowAggregationFunction(
+    window: LogicalWindow,
+    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+    inputType: RelDataType,
+    outputType: RelDataType,
+    groupings: Array[Int],
+    properties: Seq[NamedWindowProperty])
+  : AllWindowFunction[Row, Row, DataStreamWindow] = {
+
+    val aggFunction =
+      createAggregateGroupReduceFunction(
+        namedAggregates,
+        inputType,
+        outputType,
+        groupings)
+
+    if (isTimeWindow(window)) {
+      val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+      new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos)
+      .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
+    } else {
+      new AggregateAllWindowFunction(aggFunction)
+    }
+  }
+
+  /**
+    * Create a [[WindowFunction]] to compute partitioned group window aggregates.
+    *
+    */
+  private[flink] def createWindowAggregationFunction(
+    window: LogicalWindow,
+    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+    inputType: RelDataType,
+    outputType: RelDataType,
+    groupings: Array[Int],
+    properties: Seq[NamedWindowProperty])
+  : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
+
+    val aggFunction =
+      createAggregateGroupReduceFunction(
+        namedAggregates,
+        inputType,
+        outputType,
+        groupings)
+
+    if (isTimeWindow(window)) {
+      val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+      new AggregateTimeWindowFunction(aggFunction, startPos, endPos)
+      .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
+    } else {
+      new AggregateWindowFunction(aggFunction)
+    }
+  }
 
-    (mapFunction, reduceGroupFunction)
+  /**
+    * Create an [[AllWindowFunction]] to finalize incrementally pre-computed non-partitioned
+    * window aggreagtes.
+    */
+  private[flink] def createAllWindowIncrementalAggregationFunction(
+    window: LogicalWindow,
+    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+    inputType: RelDataType,
+    outputType: RelDataType,
+    groupings: Array[Int],
+    properties: Seq[NamedWindowProperty]): AllWindowFunction[Row, Row, DataStreamWindow] = {
+
+    val aggregates = transformToAggregateFunctions(
+      namedAggregates.map(_.getKey),inputType,groupings.length)._2
+
+    val (groupingOffsetMapping, aggOffsetMapping) =
+      getGroupingOffsetAndAggOffsetMapping(
+      namedAggregates,
+      inputType,
+      outputType,
+      groupings)
+
+    val finalRowArity = outputType.getFieldCount
+
+    if (isTimeWindow(window)) {
+      val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+      new IncrementalAggregateAllTimeWindowFunction(
+        aggregates,
+        groupingOffsetMapping,
+        aggOffsetMapping,
+        finalRowArity,
+        startPos,
+        endPos)
+      .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
+    } else {
+      new IncrementalAggregateAllWindowFunction(
+        aggregates,
+        groupingOffsetMapping,
+        aggOffsetMapping,
+        finalRowArity)
+    }
+  }
+
+  /**
+    * Create a [[WindowFunction]] to finalize incrementally pre-computed window aggregates.
+    */
+  private[flink] def createWindowIncrementalAggregationFunction(
+    window: LogicalWindow,
+    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+    inputType: RelDataType,
+    outputType: RelDataType,
+    groupings: Array[Int],
+    properties: Seq[NamedWindowProperty]): WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
+
+    val aggregates = transformToAggregateFunctions(
+      namedAggregates.map(_.getKey),inputType,groupings.length)._2
+
+    val (groupingOffsetMapping, aggOffsetMapping) =
+      getGroupingOffsetAndAggOffsetMapping(
+        namedAggregates,
+        inputType,
+        outputType,
+        groupings)
+
+    val finalRowArity = outputType.getFieldCount
+
+    if (isTimeWindow(window)) {
+      val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+      new IncrementalAggregateTimeWindowFunction(
+        aggregates,
+        groupingOffsetMapping,
+        aggOffsetMapping,
+        finalRowArity,
+        startPos,
+        endPos)
+      .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
+    } else {
+      new IncrementalAggregateWindowFunction(
+        aggregates,
+        groupingOffsetMapping,
+        aggOffsetMapping,
+        finalRowArity)
+    }
+  }
+
+  /**
+    * Return true if all aggregates can be partially computed. False otherwise.
+    */
+  private[flink] def doAllSupportPartialAggregation(
+    aggregateCalls: Seq[AggregateCall],
+    inputType: RelDataType,
+    groupKeysCount: Int): Boolean = {
+    transformToAggregateFunctions(
+      aggregateCalls,
+      inputType,
+      groupKeysCount)._2.forall(_.supportPartial)
+  }
+
+  /**
+    * @return groupingOffsetMapping (mapping relation between field index of intermediate
+    *         aggregate Row and output Row.)
+    *         and aggOffsetMapping (the mapping relation between aggregate function index in list
+    *         and its corresponding field index in output Row.)
+    */
+  private def getGroupingOffsetAndAggOffsetMapping(
+    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+    inputType: RelDataType,
+    outputType: RelDataType,
+    groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = {
+
+    // the mapping relation between field index of intermediate aggregate Row and output Row.
+    val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings)
+
+    // the mapping relation between aggregate function index in list and its corresponding
+    // field index in output Row.
+    val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
+
+    if (groupingOffsetMapping.length != groupings.length ||
+      aggOffsetMapping.length != namedAggregates.length) {
+      throw new TableException(
+        "Could not find output field in input data type " +
+          "or aggregate functions.")
+    }
+    (groupingOffsetMapping, aggOffsetMapping)
+  }
+
+  private def isTimeWindow(window: LogicalWindow) = {
+    window match {
+      case ProcessingTimeTumblingGroupWindow(_, size) => isTimeInterval(size.resultType)
+      case ProcessingTimeSlidingGroupWindow(_, size, _) => isTimeInterval(size.resultType)
+      case ProcessingTimeSessionGroupWindow(_, _) => true
+      case EventTimeTumblingGroupWindow(_, _, size) => isTimeInterval(size.resultType)
+      case EventTimeSlidingGroupWindow(_, _, size, _) => isTimeInterval(size.resultType)
+      case EventTimeSessionGroupWindow(_, _, _) => true
+    }
+  }
+
+  private def computeWindowStartEndPropertyPos(
+    properties: Seq[NamedWindowProperty]): (Option[Int], Option[Int]) = {
+
+    val propPos = properties.foldRight((None: Option[Int], None: Option[Int], 0)) {
+      (p, x) => p match {
+        case NamedWindowProperty(name, prop) =>
+          prop match {
+            case WindowStart(_) if x._1.isDefined =>
+              throw new TableException("Duplicate WindowStart property encountered. This is a bug.")
+            case WindowStart(_) =>
+              (Some(x._3), x._2, x._3 - 1)
+            case WindowEnd(_) if x._2.isDefined =>
+              throw new TableException("Duplicate WindowEnd property encountered. This is a bug.")
+            case WindowEnd(_) =>
+              (x._1, Some(x._3), x._3 - 1)
+          }
+      }
+    }
+    (propPos._1, propPos._2)
   }
 
   private def transformToAggregateFunctions(
-      aggregateCalls: Seq[AggregateCall],
-      inputType: RelDataType,
-      groupKeysCount: Int): (Array[Int], Array[Aggregate[_ <: Any]]) = {
+    aggregateCalls: Seq[AggregateCall],
+    inputType: RelDataType,
+    groupKeysCount: Int): (Array[Int], Array[Aggregate[_ <: Any]]) = {
 
     // store the aggregate fields of each aggregate function, by the same order of aggregates.
     val aggFieldIndexes = new Array[Int](aggregateCalls.size)
@@ -253,9 +512,9 @@ object AggregateUtil {
   }
 
   private def createAggregateBufferDataType(
-      groupings: Array[Int],
-      aggregates: Array[Aggregate[_]],
-      inputType: RelDataType): RowTypeInfo = {
+    groupings: Array[Int],
+    aggregates: Array[Aggregate[_]],
+    inputType: RelDataType): RowTypeInfo = {
 
     // get the field data types of group keys.
     val groupingTypes: Seq[TypeInformation[_]] = groupings
@@ -275,8 +534,9 @@ object AggregateUtil {
   }
 
   // Find the mapping between the index of aggregate list and aggregated value index in output Row.
-  private def getAggregateMapping(namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-      outputType: RelDataType): Array[(Int, Int)] = {
+  private def getAggregateMapping(
+    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+    outputType: RelDataType): Array[(Int, Int)] = {
 
     // the mapping relation between aggregate function index in list and its corresponding
     // field index in output Row.
@@ -298,8 +558,10 @@ object AggregateUtil {
 
   // Find the mapping between the index of group key in intermediate aggregate Row and its index
   // in output Row.
-  private def getGroupKeysMapping(inputDatType: RelDataType,
-      outputType: RelDataType, groupKeys: Array[Int]): Array[(Int, Int)] = {
+  private def getGroupKeysMapping(
+    inputDatType: RelDataType,
+    outputType: RelDataType,
+    groupKeys: Array[Int]): Array[(Int, Int)] = {
 
     // the mapping relation between field index of intermediate aggregate Row and output Row.
     var groupingOffsetMapping = ArrayBuffer[(Int, Int)]()

http://git-wip-us.apache.org/repos/asf/flink/blob/74e0971a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala
index 180248f..6fd890d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala
@@ -28,18 +28,18 @@ import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction
 import org.apache.flink.streaming.api.windowing.windows.Window
 import org.apache.flink.util.Collector
 
-class AggregateWindowFunction(groupReduceFunction: RichGroupReduceFunction[Row, Row])
-  extends RichWindowFunction[Row, Row, Tuple, Window] {
+class AggregateWindowFunction[W <: Window](groupReduceFunction: RichGroupReduceFunction[Row, Row])
+  extends RichWindowFunction[Row, Row, Tuple, W] {
 
   override def open(parameters: Configuration): Unit = {
     groupReduceFunction.open(parameters)
   }
 
   override def apply(
-      key: Tuple,
-      window: Window,
-      input: Iterable[Row],
-      out: Collector[Row]) : Unit = {
+    key: Tuple,
+    window: W,
+    input: Iterable[Row],
+    out: Collector[Row]): Unit = {
 
     groupReduceFunction.reduce(input, out)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/74e0971a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
new file mode 100644
index 0000000..85ad8e5
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.table.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, Window}
+import org.apache.flink.util.Collector
+/**
+  *
+  * Computes the final aggregate value from incrementally computed aggreagtes.
+  *
+  * @param aggregates   The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
+  *                         and output Row.
+  * @param aggregateMapping The index mapping between aggregate function list and aggregated value
+  *                         index in output Row.
+  * @param finalRowArity  The arity of the final output row.
+  */
+class IncrementalAggregateAllTimeWindowFunction(
+    private val aggregates: Array[Aggregate[_ <: Any]],
+    private val groupKeysMapping: Array[(Int, Int)],
+    private val aggregateMapping: Array[(Int, Int)],
+    private val finalRowArity: Int,
+    private val windowStartPos: Option[Int],
+    private val windowEndPos: Option[Int])
+  extends IncrementalAggregateAllWindowFunction[TimeWindow](
+    aggregates,
+    groupKeysMapping,
+    aggregateMapping,
+    finalRowArity) {
+
+  private var collector: TimeWindowPropertyCollector = _
+
+  override def open(parameters: Configuration): Unit = {
+    collector = new TimeWindowPropertyCollector(windowStartPos, windowEndPos)
+    super.open(parameters)
+  }
+
+  override def apply(
+    window: TimeWindow,
+    records: Iterable[Row],
+    out: Collector[Row]): Unit = {
+
+    // set collector and window
+    collector.wrappedCollector = out
+    collector.timeWindow = window
+
+    super.apply(window, records, collector)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/74e0971a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
new file mode 100644
index 0000000..d3f871a
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.table.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.{Collector, Preconditions}
+
+/**
+  * Computes the final aggregate value from incrementally computed aggreagtes.
+  *
+  * @param aggregates   The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
+  *                         and output Row.
+  * @param aggregateMapping The index mapping between aggregate function list and aggregated value
+  *                         index in output Row.
+  * @param finalRowArity  The arity of the final output row.
+  */
+class IncrementalAggregateAllWindowFunction[W <: Window](
+    private val aggregates: Array[Aggregate[_ <: Any]],
+    private val groupKeysMapping: Array[(Int, Int)],
+    private val aggregateMapping: Array[(Int, Int)],
+    private val finalRowArity: Int)
+  extends RichAllWindowFunction[Row, Row, W] {
+
+  private var output: Row = _
+
+  override def open(parameters: Configuration): Unit = {
+    Preconditions.checkNotNull(aggregates)
+    Preconditions.checkNotNull(groupKeysMapping)
+    output = new Row(finalRowArity)
+  }
+
+  /**
+    * Calculate aggregated values output by aggregate buffer, and set them into output
+    * Row based on the mapping relation between intermediate aggregate data and output data.
+    */
+  override def apply(
+    window: W,
+    records: Iterable[Row],
+    out: Collector[Row]): Unit = {
+
+    val iterator = records.iterator
+
+    if (iterator.hasNext) {
+      val record = iterator.next()
+      // Set group keys value to final output.
+      groupKeysMapping.foreach {
+        case (after, previous) =>
+          output.setField(after, record.productElement(previous))
+      }
+      // Evaluate final aggregate value and set to output.
+      aggregateMapping.foreach {
+        case (after, previous) =>
+          output.setField(after, aggregates(previous).evaluate(record))
+      }
+      out.collect(output)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/74e0971a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
new file mode 100644
index 0000000..e2830da
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.table.Row
+import org.apache.flink.util.Preconditions
+
+/**
+  * Incrementally computes group window aggregates.
+  *
+  * @param aggregates   The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
+  *                         and output Row.
+  */
+class IncrementalAggregateReduceFunction(
+    private val aggregates: Array[Aggregate[_]],
+    private val groupKeysMapping: Array[(Int, Int)],
+    private val intermediateRowArity: Int)
+  extends ReduceFunction[Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(groupKeysMapping)
+
+  /**
+    * For Incremental intermediate aggregate Rows, merge value1 and value2
+    * into aggregate buffer, return aggregate buffer.
+    *
+    * @param value1 The first value to combined.
+    * @param value2 The second value to combined.
+    * @return  accumulatorRow A resulting row that combines two input values.
+    *
+    */
+  override def reduce(value1: Row, value2: Row): Row = {
+
+    // TODO: once FLINK-5105 is solved, we can avoid creating a new row for each invocation
+    //   and directly merge value1 and value2.
+    val accumulatorRow = new Row(intermediateRowArity)
+
+    // copy all fields of value1 into accumulatorRow
+    (0 until intermediateRowArity)
+    .foreach(i => accumulatorRow.setField(i, value1.productElement(i)))
+    // merge value2 to accumulatorRow
+    aggregates.foreach(_.merge(value2, accumulatorRow))
+
+    accumulatorRow
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/74e0971a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
new file mode 100644
index 0000000..c880f87
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.api.table.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.util.Collector
+
+/**
+  * Computes the final aggregate value from incrementally computed aggreagtes.
+  *
+  * @param aggregates   The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
+  *                         and output Row.
+  * @param aggregateMapping The index mapping between aggregate function list and aggregated value
+  *                         index in output Row.
+  * @param finalRowArity  The arity of the final output row.
+  */
+class IncrementalAggregateTimeWindowFunction(
+    private val aggregates: Array[Aggregate[_ <: Any]],
+    private val groupKeysMapping: Array[(Int, Int)],
+    private val aggregateMapping: Array[(Int, Int)],
+    private val finalRowArity: Int,
+    private val windowStartPos: Option[Int],
+    private val windowEndPos: Option[Int])
+  extends IncrementalAggregateWindowFunction[TimeWindow](
+    aggregates,
+    groupKeysMapping,
+    aggregateMapping, finalRowArity) {
+
+  private var collector: TimeWindowPropertyCollector = _
+
+  override def open(parameters: Configuration): Unit = {
+    collector = new TimeWindowPropertyCollector(windowStartPos, windowEndPos)
+    super.open(parameters)
+  }
+
+  override def apply(
+    key: Tuple,
+    window: TimeWindow,
+    records: Iterable[Row],
+    out: Collector[Row]): Unit = {
+
+    // set collector and window
+    collector.wrappedCollector = out
+    collector.timeWindow = window
+
+    super.apply(key, window, records, collector)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/74e0971a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
new file mode 100644
index 0000000..81e6890
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.api.table.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.{Collector, Preconditions}
+
+/**
+  * Computes the final aggregate value from incrementally computed aggreagtes.
+  *
+  * @param aggregates   The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
+  *                         and output Row.
+  * @param aggregateMapping The index mapping between aggregate function list and aggregated value
+  *                         index in output Row.
+  * @param finalRowArity  The arity of the final output row.
+  */
+class IncrementalAggregateWindowFunction[W <: Window](
+    private val aggregates: Array[Aggregate[_ <: Any]],
+    private val groupKeysMapping: Array[(Int, Int)],
+    private val aggregateMapping: Array[(Int, Int)],
+    private val finalRowArity: Int)
+  extends RichWindowFunction[Row, Row, Tuple, W] {
+
+  private var output: Row = _
+
+  override def open(parameters: Configuration): Unit = {
+    Preconditions.checkNotNull(aggregates)
+    Preconditions.checkNotNull(groupKeysMapping)
+    output = new Row(finalRowArity)
+  }
+
+  /**
+    * Calculate aggregated values output by aggregate buffer, and set them into output
+    * Row based on the mapping relation between intermediate aggregate data and output data.
+    */
+  override def apply(
+    key: Tuple,
+    window: W,
+    records: Iterable[Row],
+    out: Collector[Row]): Unit = {
+
+    val iterator = records.iterator
+
+    if (iterator.hasNext) {
+      val record = iterator.next()
+      // Set group keys value to final output.
+      groupKeysMapping.foreach {
+        case (after, previous) =>
+          output.setField(after, record.productElement(previous))
+      }
+      // Evaluate final aggregate value and set to output.
+      aggregateMapping.foreach {
+        case (after, previous) =>
+          output.setField(after, aggregates(previous).evaluate(record))
+      }
+      out.collect(output)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/74e0971a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/AggregationsITCase.scala
index 2ccbb38..0753484 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/AggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/AggregationsITCase.scala
@@ -59,13 +59,13 @@ class AggregationsITCase extends StreamingMultipleProgramsTestBase {
     val windowedTable = table
       .groupBy('string)
       .window(Slide over 2.rows every 1.rows)
-      .select('string, 'int.count)
+      .select('string, 'int.count, 'int.avg)
 
     val results = windowedTable.toDataStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
-    val expected = Seq("Hello world,1", "Hello world,2", "Hello,1", "Hello,2", "Hi,1")
+    val expected = Seq("Hello world,1,3", "Hello world,2,3", "Hello,1,2", "Hello,2,2", "Hi,1,1")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
@@ -131,17 +131,17 @@ class AggregationsITCase extends StreamingMultipleProgramsTestBase {
     val windowedTable = table
       .groupBy('string)
       .window(Tumble over 5.milli on 'rowtime as 'w)
-      .select('string, 'int.count, 'w.start, 'w.end)
+      .select('string, 'int.count, 'int.avg, 'w.start, 'w.end)
 
     val results = windowedTable.toDataStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
     val expected = Seq(
-      "Hello world,1,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01",
-      "Hello world,1,1970-01-01 00:00:00.015,1970-01-01 00:00:00.02",
-      "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
-      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
+      "Hello world,1,3,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01",
+      "Hello world,1,3,1970-01-01 00:00:00.015,1970-01-01 00:00:00.02",
+      "Hello,2,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
+      "Hi,1,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 


[4/5] flink git commit: [hotfix][docs] Stream joins don't support tuple position keys

Posted by fh...@apache.org.
[hotfix][docs] Stream joins don't support tuple position keys


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3458a661
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3458a661
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3458a661

Branch: refs/heads/master
Commit: 3458a6615cf1db66f32650bbaae2841b570fa647
Parents: 74e0971
Author: Robert Metzger <rm...@apache.org>
Authored: Fri Nov 4 15:41:06 2016 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 23 18:35:44 2016 +0100

----------------------------------------------------------------------
 docs/dev/datastream_api.md | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3458a661/docs/dev/datastream_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/datastream_api.md b/docs/dev/datastream_api.md
index 4c81d63..57d2e4b 100644
--- a/docs/dev/datastream_api.md
+++ b/docs/dev/datastream_api.md
@@ -401,7 +401,7 @@ dataStream.union(otherStream1, otherStream2, ...);
             <p>Join two data streams on a given key and a common window.</p>
     {% highlight java %}
 dataStream.join(otherStream)
-    .where(0).equalTo(1)
+    .where(<key selector>).equalTo(<key selector>)
     .window(TumblingEventTimeWindows.of(Time.seconds(3)))
     .apply (new JoinFunction () {...});
     {% endhighlight %}
@@ -750,7 +750,7 @@ dataStream.union(otherStream1, otherStream2, ...)
             <p>Join two data streams on a given key and a common window.</p>
     {% highlight scala %}
 dataStream.join(otherStream)
-    .where(0).equalTo(1)
+    .where(<key selector>).equalTo(<key selector>)
     .window(TumblingEventTimeWindows.of(Time.seconds(3)))
     .apply { ... }
     {% endhighlight %}