You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/05/03 12:45:05 UTC

[2/2] flink git commit: [FLINK-4604] [table] Add support for standard deviation/variance

[FLINK-4604] [table] Add support for standard deviation/variance

This closes #3260.

Old PR: This closes #2762.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0af57fc1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0af57fc1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0af57fc1

Branch: refs/heads/master
Commit: 0af57fc1800d7430843a1e14bb70168bbd750389
Parents: 6d0c4c3
Author: Anton Mushin <an...@epam.com>
Authored: Fri Feb 3 14:06:49 2017 +0400
Committer: twalthr <tw...@apache.org>
Committed: Wed May 3 14:43:06 2017 +0200

----------------------------------------------------------------------
 docs/dev/table_api.md                           |  47 ++++-
 .../flink/table/api/scala/expressionDsl.scala   |  28 +++
 .../table/expressions/ExpressionParser.scala    |  43 +++-
 .../flink/table/expressions/aggregations.scala  |  65 ++++++
 .../aggfunctions/Sum0AggFunction.scala          |  91 +++++++++
 .../flink/table/plan/rules/FlinkRuleSets.scala  |   3 +
 .../rules/dataSet/DataSetAggregateRule.scala    |   8 +-
 .../DataSetAggregateWithNullValuesRule.scala    |   9 +-
 .../flink/table/validate/FunctionCatalog.scala  |  10 +
 .../table/api/java/batch/sql/SqlITCase.java     |  53 ++++-
 .../scala/batch/sql/AggregationsITCase.scala    | 200 +++++++++++++++++++
 .../scala/batch/table/AggregationsITCase.scala  |  74 +++++++
 .../batch/utils/TableProgramsTestBase.scala     |   5 +
 13 files changed, 625 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0af57fc1/docs/dev/table_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index 2b777c6..ec32e9b 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -1066,7 +1066,7 @@ dataType = "BYTE" | "SHORT" | "INT" | "LONG" | "FLOAT" | "DOUBLE" | "BOOLEAN" |
 
 as = composite , ".as(" , fieldReference , ")" ;
 
-aggregation = composite , ( ".sum" | ".min" | ".max" | ".count" | ".avg" | ".start" | ".end" ) , [ "()" ] ;
+aggregation = composite , ( ".sum" | ".min" | ".max" | ".count" | ".avg" | ".start" | ".end" | ".stddev_pop" | ".stddev_samp" | ".var_pop" | ".var_samp" ) , [ "()" ] ;
 
 if = composite , ".?(" , expression , "," , expression , ")" ;
 
@@ -5233,7 +5233,7 @@ AVG(numeric)
         <p>Returns the average (arithmetic mean) of <i>numeric</i> across all input values.</p>
       </td>
     </tr>
-
+    
     <tr>
       <td>
         {% highlight text %}
@@ -5376,6 +5376,49 @@ ELEMENT(ARRAY)
         <p>Returns the sole element of an array with a single element. Returns <code>null</code> if the array is empty. Throws an exception if the array has more than one element.</p>
       </td>
     </tr>
+<tr>
+      <td>
+        {% highlight text %}
+STDDEV_POP(value)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns the standard deviation of numeric <i>value</i></p>
+      </td>
+    </tr>
+    
+<tr>
+      <td>
+        {% highlight text %}
+STDDEV_SAMP(value)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns the sample standard deviation of numeric <i>value</i></p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight text %}
+VAR_POP(value)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns the variance of numeric <i>value</i></p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight text %}
+VAR_SAMP (value)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns the sample variance of numeric <i>value</i></p>
+      </td>
+    </tr>
   </tbody>
 </table>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0af57fc1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index a512098..b1d16b3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -165,6 +165,12 @@ trait ImplicitExpressionOperations {
   def sum = Sum(expr)
 
   /**
+    * Returns the sum of the values which go into it like [[Sum]].
+    * It differs in that when no non null values are applied zero is returned instead of null.
+    */
+  def sum0 = Sum0(expr)
+  
+  /**
     * Returns the minimum value of field across all input values.
     */
   def min = Min(expr)
@@ -185,6 +191,28 @@ trait ImplicitExpressionOperations {
   def avg = Avg(expr)
 
   /**
+    * Returns the population standard deviation of an expression.
+    * (the square root of [[VarPop]])
+    */
+  def stddev_pop = StddevPop(expr)
+
+  /**
+    * Returns the sample standard deviation of an expression.
+    * (the square root of [[VarSamp]]).
+    */
+  def stddev_samp = StddevSamp(expr)
+
+  /**
+    * Returns the population standard variance of an expression.
+    */
+  def var_pop = VarPop(expr)
+
+  /**
+    *  Returns the sample variance of a given expression.
+    */
+  def var_samp = VarSamp(expr)
+
+  /**
     * Converts a value to a given type.
     *
     * e.g. "42".cast(Types.INT) leads to 42.

http://git-wip-us.apache.org/repos/asf/flink/blob/0af57fc1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
index 113b85a..1356416 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
@@ -57,6 +57,11 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
   lazy val SUM: Keyword = Keyword("sum")
   lazy val START: Keyword = Keyword("start")
   lazy val END: Keyword = Keyword("end")
+  lazy val SUM0: Keyword = Keyword("sum0")
+  lazy val STDDEV_POP: Keyword = Keyword("stddev_pop")
+  lazy val STDDEV_SAMP: Keyword = Keyword("stddev_samp")
+  lazy val VAR_POP: Keyword = Keyword("var_pop")
+  lazy val VAR_SAMP: Keyword = Keyword("var_samp")
   lazy val CAST: Keyword = Keyword("cast")
   lazy val NULL: Keyword = Keyword("Null")
   lazy val IF: Keyword = Keyword("?")
@@ -95,7 +100,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
   lazy val ASIN: Keyword = Keyword("asin")
 
   def functionIdent: ExpressionParser.Parser[String] =
-    not(ARRAY) ~ not(AS) ~ not(COUNT) ~ not(AVG) ~ not(MIN) ~ not(MAX) ~
+    not(ARRAY) ~ not(AS) ~ not(COUNT) ~ not(AVG) ~ not(MIN) ~ not(MAX) ~ not(STDDEV_POP) ~
+      not(STDDEV_SAMP) ~ not(VAR_SAMP) ~ not(VAR_POP) ~
       not(SUM) ~ not(START) ~ not(END)~ not(CAST) ~ not(NULL) ~ not(IF) ~
       not(CURRENT_ROW) ~ not(UNBOUNDED_ROW) ~ not(CURRENT_RANGE) ~ not(UNBOUNDED_RANGE) ~>
       super.ident
@@ -205,6 +211,9 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
   lazy val suffixSum: PackratParser[Expression] =
     composite <~ "." ~ SUM ~ opt("()") ^^ { e => Sum(e) }
 
+  lazy val suffixSum0: PackratParser[Expression] =
+    composite <~ "." ~ SUM0 ~ opt("()") ^^ { e => Sum0(e) }
+
   lazy val suffixMin: PackratParser[Expression] =
     composite <~ "." ~ MIN ~ opt("()") ^^ { e => Min(e) }
 
@@ -223,6 +232,17 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
   lazy val suffixEnd: PackratParser[Expression] =
     composite <~ "." ~ END ~ opt("()") ^^ { e => WindowEnd(e) }
 
+  lazy val suffixStddevPop: PackratParser[Expression] =
+    composite <~ "." ~ STDDEV_POP ~ opt("()") ^^ { e => StddevPop(e) }
+
+  lazy val suffixStddevSamp: PackratParser[Expression] =
+    composite <~ "." ~ STDDEV_SAMP ~ opt("()") ^^ { e => StddevSamp(e) }
+
+  lazy val suffixVarSamp: PackratParser[Expression] =
+    composite <~ "." ~ VAR_SAMP ~ opt("()") ^^ { e => VarSamp(e) }
+
+  lazy val suffixVarPop: PackratParser[Expression] =
+    composite <~ "." ~ VAR_POP ~ opt("()") ^^ { e => VarPop(e) }
   lazy val suffixCast: PackratParser[Expression] =
     composite ~ "." ~ CAST ~ "(" ~ dataType ~ ")" ^^ {
     case e ~ _ ~ _ ~ _ ~ dt ~ _ => Cast(e, dt)
@@ -320,8 +340,9 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
     composite <~ "." ~ ASIN ~ opt("()") ^^ { e => Asin(e) }
 
   lazy val suffixed: PackratParser[Expression] =
-    suffixTimeInterval | suffixRowInterval | suffixStart | suffixEnd | suffixAgg |
+    suffixTimeInterval | suffixRowInterval | suffixStart | suffixSum0 | suffixEnd | suffixAgg |
       suffixCast | suffixAs | suffixTrim | suffixTrimWithoutArgs | suffixIf | suffixAsc |
+      suffixStddevPop | suffixStddevSamp | suffixVarPop | suffixVarSamp |
       suffixDesc | suffixToDate | suffixToTimestamp | suffixToTime | suffixExtract |
       suffixFloor | suffixCeil | suffixGet | suffixFlattening | suffixAsin |
       suffixFunctionCall | suffixFunctionCallOneArg // function call must always be at the end
@@ -334,6 +355,9 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
   lazy val prefixSum: PackratParser[Expression] =
     SUM ~ "(" ~> expression <~ ")" ^^ { e => Sum(e) }
 
+  lazy val prefixSum0: PackratParser[Expression] =
+    SUM0 ~ "(" ~> expression <~ ")" ^^ { e => Sum0(e) }
+
   lazy val prefixMin: PackratParser[Expression] =
     MIN ~ "(" ~> expression <~ ")" ^^ { e => Min(e) }
 
@@ -352,6 +376,17 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
   lazy val prefixEnd: PackratParser[Expression] =
     END ~ "(" ~> expression <~ ")" ^^ { e => WindowEnd(e) }
 
+  lazy val prefixStddevPop: PackratParser[Expression] =
+    STDDEV_POP ~ "(" ~> expression <~ ")" ^^ { e => StddevPop(e) }
+
+  lazy val prefixStddevSamp: PackratParser[Expression] =
+    STDDEV_SAMP ~ "(" ~> expression <~ ")" ^^ { e => StddevSamp(e) }
+
+  lazy val prefixVarSamp: PackratParser[Expression] =
+    VAR_SAMP ~ "(" ~> expression <~ ")" ^^ { e => VarSamp(e) }
+
+  lazy val prefixVarPop: PackratParser[Expression] =
+    VAR_POP ~ "(" ~> expression <~ ")" ^^ { e => VarPop(e) }
   lazy val prefixCast: PackratParser[Expression] =
     CAST ~ "(" ~ expression ~ "," ~ dataType ~ ")" ^^ {
     case _ ~ _ ~ e ~ _ ~ dt ~ _ => Cast(e, dt)
@@ -411,7 +446,9 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
     ASIN ~ "(" ~> composite <~ ")" ^^ { e => Asin(e) }
 
   lazy val prefixed: PackratParser[Expression] =
-    prefixArray | prefixAgg | prefixStart | prefixEnd | prefixCast | prefixAs | prefixTrim |
+    prefixArray | prefixSum | prefixSum0 | prefixAgg | prefixStart | prefixEnd |
+      prefixCast | prefixAs | prefixTrim | prefixStddevPop | prefixStddevSamp | prefixVarSamp |
+      prefixVarPop |
       prefixTrimWithoutArgs | prefixIf | prefixExtract | prefixFloor | prefixCeil | prefixGet |
       prefixFlattening | prefixAsin | prefixFunctionCall |
       prefixFunctionCallOneArg // function call must always be at the end

http://git-wip-us.apache.org/repos/asf/flink/blob/0af57fc1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
index 4ef5209..82f4428 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
@@ -66,6 +66,19 @@ case class Sum(child: Expression) extends Aggregation {
   }
 }
 
+case class Sum0(child: Expression) extends Aggregation {
+  override def toString = s"sum0($child)"
+
+  override private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = {
+    relBuilder.aggregateCall(SqlStdOperatorTable.SUM0, false, null, name, child.toRexNode)
+  }
+
+  override private[flink] def resultType = child.resultType
+
+  override private[flink] def validateInput =
+    TypeCheckUtils.assertNumericExpr(child.resultType, "sum0")
+}
+
 case class Min(child: Expression) extends Aggregation {
   override def toString = s"min($child)"
 
@@ -130,3 +143,55 @@ case class Avg(child: Expression) extends Aggregation {
     new SqlAvgAggFunction(AVG)
   }
 }
+
+case class StddevPop(child: Expression) extends Aggregation {
+  override def toString = s"stddev_pop($child)"
+
+  override private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = {
+    relBuilder.aggregateCall(SqlStdOperatorTable.STDDEV_POP, false, null, name, child.toRexNode)
+  }
+
+  override private[flink] def resultType = child.resultType
+
+  override private[flink] def validateInput =
+    TypeCheckUtils.assertNumericExpr(child.resultType, "stddev_pop")
+}
+
+case class StddevSamp(child: Expression) extends Aggregation {
+  override def toString = s"stddev_samp($child)"
+
+  override private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = {
+    relBuilder.aggregateCall(SqlStdOperatorTable.STDDEV_SAMP, false, null, name, child.toRexNode)
+  }
+
+  override private[flink] def resultType = child.resultType
+
+  override private[flink] def validateInput =
+    TypeCheckUtils.assertNumericExpr(child.resultType, "stddev_samp")
+}
+
+case class VarPop(child: Expression) extends Aggregation {
+  override def toString = s"var_pop($child)"
+
+  override private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = {
+    relBuilder.aggregateCall(SqlStdOperatorTable.VAR_POP, false, null, name, child.toRexNode)
+  }
+
+  override private[flink] def resultType = child.resultType
+
+  override private[flink] def validateInput =
+    TypeCheckUtils.assertNumericExpr(child.resultType, "var_pop")
+}
+
+case class VarSamp(child: Expression) extends Aggregation {
+  override def toString = s"var_samp($child)"
+
+  override private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = {
+    relBuilder.aggregateCall(SqlStdOperatorTable.VAR_SAMP, false, null, name, child.toRexNode)
+  }
+
+  override private[flink] def resultType = child.resultType
+
+  override private[flink] def validateInput =
+    TypeCheckUtils.assertNumericExpr(child.resultType, "var_samp")
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0af57fc1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/Sum0AggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/Sum0AggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/Sum0AggFunction.scala
new file mode 100644
index 0000000..6a24fbe
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/Sum0AggFunction.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.functions.aggfunctions
+
+import java.math.BigDecimal
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.table.functions.Accumulator
+
+abstract class Sum0AggFunction[T: Numeric] extends SumAggFunction[T] {
+
+  override def getValue(accumulator: Accumulator): T = {
+    val a = accumulator.asInstanceOf[SumAccumulator[T]]
+    if (a.f1) {
+      a.f0
+    } else {
+      0.asInstanceOf[T]
+    }
+  }
+}
+
+/**
+  * Built-in Byte Sum0 aggregate function
+  */
+class ByteSum0AggFunction extends Sum0AggFunction[Byte] {
+  override def getValueTypeInfo = BasicTypeInfo.BYTE_TYPE_INFO
+}
+
+/**
+  * Built-in Short Sum0 aggregate function
+  */
+class ShortSum0AggFunction extends Sum0AggFunction[Short] {
+  override def getValueTypeInfo = BasicTypeInfo.SHORT_TYPE_INFO
+}
+
+/**
+  * Built-in Int Sum0 aggregate function
+  */
+class IntSum0AggFunction extends Sum0AggFunction[Int] {
+  override def getValueTypeInfo = BasicTypeInfo.INT_TYPE_INFO
+}
+
+/**
+  * Built-in Long Sum0 aggregate function
+  */
+class LongSum0AggFunction extends Sum0AggFunction[Long] {
+  override def getValueTypeInfo = BasicTypeInfo.LONG_TYPE_INFO
+}
+
+/**
+  * Built-in Float Sum0 aggregate function
+  */
+class FloatSum0AggFunction extends Sum0AggFunction[Float] {
+  override def getValueTypeInfo = BasicTypeInfo.FLOAT_TYPE_INFO
+}
+
+/**
+  * Built-in Double Sum0 aggregate function
+  */
+class DoubleSum0AggFunction extends Sum0AggFunction[Double] {
+  override def getValueTypeInfo = BasicTypeInfo.DOUBLE_TYPE_INFO
+}
+
+/**
+  * Built-in Big Decimal Sum0 aggregate function
+  */
+class DecimalSum0AggFunction extends DecimalSumAggFunction {
+
+  override def getValue(accumulator: Accumulator): BigDecimal = {
+    if (!accumulator.asInstanceOf[DecimalSumAccumulator].f1) {
+      0.asInstanceOf[BigDecimal]
+    } else {
+      accumulator.asInstanceOf[DecimalSumAccumulator].f0
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0af57fc1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index 0bee4e5..6ebbae4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -74,6 +74,9 @@ object FlinkRuleSets {
     // expand distinct aggregate to normal aggregate with groupby
     AggregateExpandDistinctAggregatesRule.JOIN,
 
+    //aggregate reduce rule (deviation/variance functions)
+    AggregateReduceFunctionsRule.INSTANCE,
+
     // remove unnecessary sort rule
     SortRemoveRule.INSTANCE,
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0af57fc1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala
index b4d5bc9..faaeb97 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala
@@ -21,6 +21,7 @@ package org.apache.flink.table.plan.rules.dataSet
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.sql.SqlKind
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.dataset.{DataSetAggregate, DataSetUnion}
 import org.apache.flink.table.plan.nodes.logical.FlinkLogicalAggregate
@@ -54,7 +55,12 @@ class DataSetAggregateRule
     // check if we have distinct aggregates
     val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
 
-    !distinctAggs
+    val supported = agg.getAggCallList.map(_.getAggregation.getKind).forall {
+      case SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | SqlKind.VAR_SAMP => false
+      case _ => true
+    }
+
+    !distinctAggs && supported
   }
 
   override def convert(rel: RelNode): RelNode = {

http://git-wip-us.apache.org/repos/asf/flink/blob/0af57fc1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala
index d183e60..9636980 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala
@@ -17,11 +17,13 @@
  */
 package org.apache.flink.table.plan.rules.dataSet
 
+
 import com.google.common.collect.ImmutableList
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
 import org.apache.calcite.rex.RexLiteral
+import org.apache.calcite.sql.SqlKind
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.dataset.DataSetAggregate
 import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalAggregate, FlinkLogicalUnion, FlinkLogicalValues}
@@ -51,7 +53,12 @@ class DataSetAggregateWithNullValuesRule
     // check if we have distinct aggregates
     val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
 
-    !distinctAggs
+    val supported = agg.getAggCallList.map(_.getAggregation.getKind).forall {
+      case SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | SqlKind.VAR_SAMP => false
+      case _ => true
+    }
+
+    !distinctAggs && supported
   }
 
   override def convert(rel: RelNode): RelNode = {

http://git-wip-us.apache.org/repos/asf/flink/blob/0af57fc1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index 729ad48..cb37ce4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -169,6 +169,11 @@ object FunctionCatalog {
     "max" -> classOf[Max],
     "min" -> classOf[Min],
     "sum" -> classOf[Sum],
+    "sum0" -> classOf[Sum0],
+    "stddev_pop" -> classOf[StddevPop],
+    "stddev_samp" -> classOf[StddevSamp],
+    "var_pop" -> classOf[VarPop],
+    "var_samp" -> classOf[VarSamp],
 
     // string functions
     "charLength" -> classOf[CharLength],
@@ -305,10 +310,15 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
     SqlStdOperatorTable.GROUPING_ID,
     // AGGREGATE OPERATORS
     SqlStdOperatorTable.SUM,
+    SqlStdOperatorTable.SUM0,
     SqlStdOperatorTable.COUNT,
     SqlStdOperatorTable.MIN,
     SqlStdOperatorTable.MAX,
     SqlStdOperatorTable.AVG,
+    SqlStdOperatorTable.STDDEV_POP,
+    SqlStdOperatorTable.STDDEV_SAMP,
+    SqlStdOperatorTable.VAR_POP,
+    SqlStdOperatorTable.VAR_SAMP,
     // ARRAY OPERATORS
     SqlStdOperatorTable.ARRAY_VALUE_CONSTRUCTOR,
     SqlStdOperatorTable.ITEM,

http://git-wip-us.apache.org/repos/asf/flink/blob/0af57fc1/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
index 114226c..a5d2021 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.api.java.batch.sql;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.typeutils.MapTypeInfo;
@@ -36,9 +37,12 @@ import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import scala.collection.JavaConversions;
+import scala.collection.mutable.Buffer;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
@@ -136,7 +140,7 @@ public class SqlITCase extends TableProgramsCollectionTestBase {
 		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
 
 		tableEnv.registerDataSet("t1", ds1, "a, b, c");
-		tableEnv.registerDataSet("t2",ds2, "d, e, f, g, h");
+		tableEnv.registerDataSet("t2", ds2, "d, e, f, g, h");
 
 		String sqlQuery = "SELECT c, g FROM t1, t2 WHERE b = e";
 		Table result = tableEnv.sql(sqlQuery);
@@ -156,9 +160,7 @@ public class SqlITCase extends TableProgramsCollectionTestBase {
 		rows.add(new Tuple2<>(1, Collections.singletonMap("foo", "bar")));
 		rows.add(new Tuple2<>(2, Collections.singletonMap("foo", "spam")));
 
-		TypeInformation<Tuple2<Integer, Map<String, String>>> ty = new TupleTypeInfo<>(
-			BasicTypeInfo.INT_TYPE_INFO,
-			new MapTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO));
+		TypeInformation<Tuple2<Integer, Map<String, String>>> ty = new TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO, new MapTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO));
 
 		DataSet<Tuple2<Integer, Map<String, String>>> ds1 = env.fromCollection(rows, ty);
 		tableEnv.registerDataSet("t1", ds1, "a, b");
@@ -171,4 +173,47 @@ public class SqlITCase extends TableProgramsCollectionTestBase {
 		String expected = "bar\n" + "spam\n";
 		compareResultAsText(results, expected);
 	}
+
+	@Test
+	public void testDeviationAggregation() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		tableEnv.registerDataSet("AggTable", ds, "x, y, z");
+
+		Buffer<String> columnForAgg = JavaConversions.asScalaBuffer(Arrays.asList("x, y".split(",")));
+
+		String sqlQuery = getSelectQueryFromTemplate("AVG(?),STDDEV_POP(?),STDDEV_SAMP(?),VAR_POP(?),VAR_SAMP(?)", columnForAgg, "AggTable");
+		Table result = tableEnv.sql(sqlQuery);
+
+		String sqlQuery1 = getSelectQueryFromTemplate("SUM(?)/COUNT(?), " +
+			"SQRT( (SUM(? * ?) - SUM(?) * SUM(?) / COUNT(?)) / COUNT(?)), " +
+			"SQRT( (SUM(? * ?) - SUM(?) * SUM(?) / COUNT(?)) / CASE COUNT(?) WHEN 1 THEN NULL ELSE COUNT(?) - 1 END), " +
+			"(SUM(? * ?) - SUM(?) * SUM(?) / COUNT(?)) / COUNT(?), " +
+			"(SUM(? * ?) - SUM(?) * SUM(?) / COUNT(?)) / CASE COUNT(?) WHEN 1 THEN NULL ELSE COUNT(?) - 1 END", columnForAgg, "AggTable");
+
+		Table expected = tableEnv.sql(sqlQuery1);
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+
+		DataSet<Row> expectedResultSet = tableEnv.toDataSet(expected, Row.class);
+		String expectedResults = expectedResultSet.map(new MapFunction<Row, Object>() {
+			@Override
+			public Object map(Row value) throws Exception {
+				StringBuilder stringBuffer = new StringBuilder();
+
+				int arityCount = value.getArity();
+
+				for (int i = 0; i < arityCount; i++) {
+					Object product = value.getField(i);
+					stringBuffer.append(Double.valueOf(product.toString()).intValue()).append(",");
+				}
+				return stringBuffer.substring(0, stringBuffer.length() - 1);
+			}
+		}).collect().get(0).toString();
+
+		compareResultAsText(results, expectedResults);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0af57fc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala
index 600c15b..4fbd734 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala
@@ -295,6 +295,206 @@ class AggregationsITCase(
     TestBaseUtils.compareResultAsText(results3.asJava, expected3)
   }
 
+
+
+  @Test
+  def testSqrtOfAggregatedSet(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds = env.fromElements((1.0f, 1), (2.0f, 2)).toTable(tEnv)
+
+    tEnv.registerTable("MyTable", ds)
+
+    val sqlQuery = "SELECT " +
+      "SQRT((SUM(a * a) - SUM(a) * SUM(a) / COUNT(a)) / COUNT(a)) " +
+      "from (select _1 as a from MyTable)"
+
+    val expected = "0.5"
+    val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testStddevPopAggregate(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds = env.fromElements(
+      (1: Byte, 1 : Short, 1, 1L, 1F, 1D),
+      (2: Byte, 2 : Short, 2, 2L, 2F, 2D)).toTable(tEnv)
+    tEnv.registerTable("myTable", ds)
+    val columns = Array("_1","_2","_3","_4","_5","_6")
+
+    val sqlQuery = getSelectQueryFromTemplate("STDDEV_POP(?)")(columns,"myTable")
+
+    val actualResult = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+    val expectedResult = "0,0,0,0,0.5,0.5"
+    TestBaseUtils.compareOrderedResultAsText(actualResult.asJava, expectedResult)
+  }
+
+  @Test
+  def testStddevPopAggregateWithOtherAggreagteSUM0(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds = env.fromElements(
+      (1: Byte, 1 : Short, 1, 1L, 1F, 1D),
+      (2: Byte, 2 : Short, 2, 2L, 2F, 2D)).toTable(tEnv)
+    tEnv.registerTable("myTable", ds)
+    val columns = Array("_1","_2","_3","_4","_5","_6")
+
+    val sqlQuery = getSelectQueryFromTemplate("STDDEV_POP(?), " +
+                                  "$sum0(?), " +
+                                  "avg(?), " +
+                                  "max(?), " +
+                                  "min(?), " +
+                                  "count(?)" ) (columns,"myTable")
+
+    val actualResult = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+
+    val expectedResult =
+        "0,3,1,2,1,2," +
+        "0,3,1,2,1,2," +
+        "0,3,1,2,1,2," +
+        "0,3,1,2,1,2," +
+        "0.5,3.0,1.5,2.0,1.0,2," +
+        "0.5,3.0,1.5,2.0,1.0,2"
+    TestBaseUtils.compareOrderedResultAsText(actualResult.asJava, expectedResult)
+  }
+
+  @Test
+  def testStddevPopAggregateWithOtherAggreagte(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds = env.fromElements(
+      (1: Byte, 1 : Short, 1, 1L, 1F, 1D),
+      (2: Byte, 2 : Short, 2, 2L, 2F, 2D)).toTable(tEnv)
+    tEnv.registerTable("myTable", ds)
+    val columns = Array("_1","_2","_3","_4","_5","_6")
+
+    val sqlQuery = getSelectQueryFromTemplate("STDDEV_POP(?), " +
+                                  "sum(?), " +
+                                  "avg(?), " +
+                                  "max(?), " +
+                                  "min(?), " +
+                                  "count(?)" )(columns,"myTable")
+
+    val actualResult = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+
+    val expectedResult =
+      "0,3,1,2,1,2," +
+        "0,3,1,2,1,2," +
+        "0,3,1,2,1,2," +
+        "0,3,1,2,1,2," +
+        "0.5,3.0,1.5,2.0,1.0,2," +
+        "0.5,3.0,1.5,2.0,1.0,2"
+    TestBaseUtils.compareOrderedResultAsText(actualResult.asJava, expectedResult)
+  }
+
+  @Test
+  def testStddevSampAggregate(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds1 = env.fromElements(
+      (1: Byte, 1 : Short, 1, 1L, 1F, 1D),
+      (2: Byte, 2 : Short, 2, 2L, 2F, 2D)).toTable(tEnv)
+    tEnv.registerTable("myTable", ds1)
+    val columns = Seq("_1","_2","_3","_4","_5","_6")
+
+    val sqlQuery = getSelectQueryFromTemplate("STDDEV_SAMP(?)")(columns,"myTable")
+    val sqlExpectedQuery = getSelectQueryFromTemplate(
+      "SQRT((SUM(? * ?) - SUM(?) * SUM(?) / COUNT(?)) / " +
+        "CASE " +
+        "COUNT(?) WHEN 1 THEN NULL " +
+        "ELSE COUNT(?) - 1 " +
+        "END)")(columns,"myTable")
+
+    val actualResult = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+      .head
+      .toString
+      .split(",").map(x=>"%.5f".format(x.toFloat))
+
+    val expectedResult = tEnv.sql(sqlExpectedQuery).toDataSet[Row].collect()
+      .head
+      .toString
+      .split(",").map(x=>"%.5f".format(x.toFloat))
+
+    Assert.assertEquals(expectedResult.mkString(","), actualResult.mkString(","))
+  }
+
+  @Test
+  def testVarPopAggregate(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds = env.fromElements(
+      (1: Byte, 1 : Short, 1, 1L, 1F, 1D),
+      (2: Byte, 2 : Short, 2, 2L, 2F, 2D)).toTable(tEnv)
+    tEnv.registerTable("myTable", ds)
+    val columns = Seq("_1","_2","_3","_4","_5","_6")
+
+    val sqlQuery = getSelectQueryFromTemplate("var_pop(?)")(columns,"myTable")
+    val sqlExpectedQuery = getSelectQueryFromTemplate(
+      "(SUM(? * ?) - SUM(?) * SUM(?) / COUNT(?)) / COUNT(?)")(columns,"myTable")
+
+    val actualResult = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+    val expectedResult = tEnv.sql(sqlExpectedQuery)
+      .toDataSet[Row]
+      .collect().head
+      .toString
+    TestBaseUtils.compareOrderedResultAsText(actualResult.asJava, expectedResult)
+  }
+
+  @Test
+  def testVarSampAggregate(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds = env.fromElements(
+      (1: Byte, 1 : Short, 1, 1L, 1F, 1D),
+      (2: Byte, 2 : Short, 2, 2L, 2F, 2D)).toTable(tEnv)
+    tEnv.registerTable("myTable", ds)
+    val columns = Seq("_1","_2","_3","_4","_5","_6")
+
+    val sqlQuery = getSelectQueryFromTemplate("var_samp(?)")(columns,"myTable")
+    val sqlExpectedQuery = getSelectQueryFromTemplate(
+      "(SUM(? * ?) - SUM(?) * SUM(?) / COUNT(?)) / CASE COUNT(?) WHEN 1 THEN NULL " +
+        "ELSE COUNT(?) - 1 END")(columns,"myTable")
+
+    val actualResult = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+    val expectedResult = tEnv.sql(sqlExpectedQuery)
+      .toDataSet[Row]
+      .collect().head
+      .toString
+    TestBaseUtils.compareOrderedResultAsText(actualResult.asJava, expectedResult)
+  }
+
+  @Test
+  def testSumNullElements(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = getSelectQueryFromTemplate("$sum0(?)")(
+      Seq("_1","_2","_3","_4","_5","_6"),
+      "(select * from MyTable where _1 = 4)"
+    )
+
+    val ds = env.fromElements(
+      (1: Byte, 2L,1D,1F,1,1:Short ),
+      (2: Byte, 2L,1D,1F,1,1:Short ))
+    tEnv.registerDataSet("MyTable", ds)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "null,null,null,null,null,null"
+    val results = result.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
   @Test
   def testTumbleWindowAggregate(): Unit = {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0af57fc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala
index 4838747..050f5f4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala
@@ -339,6 +339,80 @@ class AggregationsITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  @Test
+  def testAnalyticAggregation(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+      .select('_1.stddev_pop, '_1.stddev_samp, '_1.var_pop, '_1.var_samp)
+    val results = t.toDataSet[Row].collect()
+    val expected = "6,6,36,38"
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testSQLStyleAnalyticAggregation(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+      .select(
+        """stddev_pop(a) as a1, a.stddev_pop as a2,
+          |stddev_samp (a) as b1, a.stddev_samp as b2,
+          |var_pop (a) as c1, a.var_pop as c2,
+          |var_samp (a) as d1, a.var_samp as d2
+        """.stripMargin)
+    val expected = "6,6,6,6,36,36,38,38"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testWorkingAnalyticAggregationDataTypes(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val ds = env.fromElements(
+      (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d),
+      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d)).toTable(tEnv)
+    val res = ds.select('_1.stddev_pop, '_2.stddev_pop, '_3.stddev_pop,
+                        '_4.stddev_pop, '_5.stddev_pop, '_6.stddev_pop,
+                        '_1.stddev_samp, '_2.stddev_samp, '_3.stddev_samp,
+                        '_4.stddev_samp, '_5.stddev_samp, '_6.stddev_samp,
+                        '_1.var_pop, '_2.var_pop, '_3.var_pop,
+                        '_4.var_pop, '_5.var_pop, '_6.var_pop,
+                        '_1.var_samp, '_2.var_samp, '_3.var_samp,
+                        '_4.var_samp, '_5.var_samp, '_6.var_samp)
+    val expected =
+        "0,0,0," +
+        "0,0.5,0.5," +
+        "1,1,1," +
+        "1,0.70710677,0.7071067811865476," +
+        "0,0,0," +
+        "0,0.25,0.25," +
+        "1,1,1," +
+        "1,0.5,0.5"
+    val results = res.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testPojoAnalyticAggregation(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val input = env.fromElements(
+      MyWC("hello", 1),
+      MyWC("hello", 8),
+      MyWC("ciao", 3),
+      MyWC("hola", 1),
+      MyWC("hola", 8))
+    val expr = input.toTable(tEnv)
+    val result = expr
+      .groupBy('word)
+      .select('word, 'frequency.stddev_pop)
+      .toDataSet[MyWC]
+    val mappedResult = result.map(w => (w.word, w.frequency)).collect()
+    val expected = "(hola,3)\n(ciao,0)\n(hello,3)"
+    TestBaseUtils.compareResultAsText(mappedResult.asJava, expected)
+  }
 }
 
 case class WC(word: String, frequency: Long)

http://git-wip-us.apache.org/repos/asf/flink/blob/0af57fc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala
index cf9d947..ee8e1f5 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala
@@ -37,6 +37,11 @@ class TableProgramsTestBase(
     }
     conf
   }
+
+  def getSelectQueryFromTemplate(selectBlock: String)
+                                (columnsName: Seq[String], table :String): String = {
+    s"SELECT ${columnsName.map(x=>selectBlock.replace("?",x)).mkString(",")} FROM $table"
+  }
 }
 
 object TableProgramsTestBase {