You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/02/24 16:31:46 UTC

[1/3] flink git commit: [FLINK-5899] [table] Fix translation of batch event-time tumbling windows with non-partial aggregation functions.

Repository: flink
Updated Branches:
  refs/heads/master 2437da6e5 -> 0a97cd29a


[FLINK-5899] [table] Fix translation of batch event-time tumbling windows with non-partial aggregation functions.

This closes #3405.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0a97cd29
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0a97cd29
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0a97cd29

Branch: refs/heads/master
Commit: 0a97cd29ae461955162944d150dd21d67b4cb0df
Parents: a755de2
Author: shaoxuan-wang <ws...@gmail.com>
Authored: Fri Feb 24 11:57:44 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Feb 24 17:12:42 2017 +0100

----------------------------------------------------------------------
 .../org/apache/flink/table/runtime/aggregate/AggregateUtil.scala | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0a97cd29/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 034ff9e..cd473ee 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -212,14 +212,14 @@ object AggregateUtil {
         else {
           // for non-incremental aggregations
           new DataSetTumbleTimeWindowAggReduceGroupFunction(
-            intermediateRowArity - 1,
+            intermediateRowArity,
             asLong(size),
             startPos,
             endPos,
             aggregates,
             groupingOffsetMapping,
             aggOffsetMapping,
-            intermediateRowArity,
+            intermediateRowArity + 1, // the additional field is used to store the time attribute
             outputType.getFieldCount)
         }
       case EventTimeTumblingGroupWindow(_, _, size) =>


[3/3] flink git commit: [hotfix] [core] Changing PublicEvolving anntoation of Archiveable to Internal.

Posted by fh...@apache.org.
[hotfix] [core] Changing PublicEvolving anntoation of Archiveable to Internal.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8c78abab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8c78abab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8c78abab

Branch: refs/heads/master
Commit: 8c78ababb5e966ab950aa190cf6bb23e53e2f644
Parents: 2437da6
Author: Fabian Hueske <fh...@apache.org>
Authored: Fri Feb 24 12:28:29 2017 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Feb 24 17:12:42 2017 +0100

----------------------------------------------------------------------
 .../src/main/java/org/apache/flink/api/common/Archiveable.java   | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8c78abab/flink-core/src/main/java/org/apache/flink/api/common/Archiveable.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/Archiveable.java b/flink-core/src/main/java/org/apache/flink/api/common/Archiveable.java
index 69e050d..7881f8e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/Archiveable.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/Archiveable.java
@@ -17,11 +17,11 @@
  */
 package org.apache.flink.api.common;
 
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Internal;
 
 import java.io.Serializable;
 
-@PublicEvolving
+@Internal
 public interface Archiveable<T extends Serializable> {
 	T archive();
 }


[2/3] flink git commit: [FLINK-5710] [table] Add proctime() function to indicate processing time in Stream SQL.

Posted by fh...@apache.org.
[FLINK-5710] [table] Add proctime() function to indicate processing time in Stream SQL.

This closes #3370.
This closes #3302. // duplicate of #3370


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a755de27
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a755de27
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a755de27

Branch: refs/heads/master
Commit: a755de27b85fe72be4a6f2063225ddc5c7f69058
Parents: 8c78aba
Author: Haohui Mai <wh...@apache.org>
Authored: Thu Feb 23 13:51:45 2017 -0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Feb 24 17:12:42 2017 +0100

----------------------------------------------------------------------
 .../functions/TimeModeIndicatorFunctions.scala  | 10 ++++++
 .../datastream/LogicalWindowAggregateRule.scala | 38 +++++++++++++-------
 .../flink/table/validate/FunctionCatalog.scala  | 10 +++---
 .../scala/stream/sql/WindowAggregateTest.scala  | 23 +++++++++++-
 4 files changed, 63 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a755de27/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala
index 7a7e00f..b9b66ea 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala
@@ -34,6 +34,15 @@ object EventTimeExtractor extends SqlFunction("ROWTIME", SqlKind.OTHER_FUNCTION,
     SqlMonotonicity.INCREASING
 }
 
+object ProcTimeExtractor extends SqlFunction("PROCTIME", SqlKind.OTHER_FUNCTION,
+  ReturnTypes.explicit(SqlTypeName.TIMESTAMP), null, OperandTypes.NILADIC,
+  SqlFunctionCategory.SYSTEM) {
+  override def getSyntax: SqlSyntax = SqlSyntax.FUNCTION
+
+  override def getMonotonicity(call: SqlOperatorBinding): SqlMonotonicity =
+    SqlMonotonicity.INCREASING
+}
+
 abstract class TimeIndicator extends LeafExpression {
   /**
     * Returns the [[org.apache.flink.api.common.typeinfo.TypeInformation]]
@@ -51,3 +60,4 @@ abstract class TimeIndicator extends LeafExpression {
 }
 
 case class RowTime() extends TimeIndicator
+case class ProcTime() extends TimeIndicator

http://git-wip-us.apache.org/repos/asf/flink/blob/a755de27/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala
index 094e47b..f5eb5f9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala
@@ -25,14 +25,16 @@ import org.apache.calcite.plan._
 import org.apache.calcite.plan.hep.HepRelVertex
 import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalProject}
 import org.apache.calcite.rex.{RexCall, RexLiteral, RexNode}
+import org.apache.calcite.sql.SqlOperator
 import org.apache.calcite.sql.fun.SqlFloorFunction
 import org.apache.calcite.util.ImmutableBitSet
 import org.apache.flink.table.api.scala.Tumble
 import org.apache.flink.table.api.{TableException, TumblingWindow, Window}
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.expressions._
-import org.apache.flink.table.functions.EventTimeExtractor
+import org.apache.flink.table.functions.{EventTimeExtractor, ProcTimeExtractor}
 import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
 
 import scala.collection.JavaConversions._
 
@@ -104,15 +106,11 @@ class LogicalWindowAggregateRule
     field match {
       case call: RexCall =>
         call.getOperator match {
-          case _: SqlFloorFunction => call.getOperands.get(0) match {
-            case c: RexCall => if (c.getOperator == EventTimeExtractor) {
-              val unit = call.getOperands.get(1)
-                .asInstanceOf[RexLiteral].getValue.asInstanceOf[TimeUnitRange]
-              return Some(LogicalWindowAggregateRule.timeUnitRangeToWindow(unit)
-                .on("rowtime"))
-            }
-            case _ =>
-          }
+          case _: SqlFloorFunction =>
+            val unit: TimeUnitRange = LogicalWindowAggregateRule.getLiteral(call.getOperands.get(1))
+            val w = LogicalWindowAggregateRule.timeUnitRangeToTumbleWindow(unit)
+            return LogicalWindowAggregateRule.decorateTimeIndicator(
+              call.getOperands.get(0).asInstanceOf[RexCall].getOperator, w)
           case _ =>
         }
       case _ =>
@@ -130,10 +128,24 @@ object LogicalWindowAggregateRule {
 
   private[flink] val INSTANCE = new LogicalWindowAggregateRule
 
-  private val EXPR_ONE = ExpressionParser.parseExpression("1")
+  private def decorateTimeIndicator(operator: SqlOperator, window: TumblingWindow) = {
+    operator match {
+      case EventTimeExtractor => Some(window.on("rowtime"))
+      case ProcTimeExtractor => Some(window)
+      case _ => None
+    }
+  }
+
+  private def timeUnitRangeToTumbleWindow(range: TimeUnitRange): TumblingWindow = {
+    intervalToTumbleWindow(range.startUnit.multiplier.longValue())
+  }
+
+  private def intervalToTumbleWindow(size: Long): TumblingWindow = {
+    Tumble over Literal(size, TimeIntervalTypeInfo.INTERVAL_MILLIS)
+  }
 
-  def timeUnitRangeToWindow(range: TimeUnitRange): TumblingWindow = {
-    Tumble over ExpressionUtils.toMilliInterval(EXPR_ONE, range.startUnit.multiplier.longValue())
+  private def getLiteral[T](node: RexNode): T = {
+    node.asInstanceOf[RexLiteral].getValue.asInstanceOf[T]
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a755de27/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index 94237f7..3c89ec4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -23,8 +23,8 @@ import org.apache.calcite.sql.util.{ChainedSqlOperatorTable, ListSqlOperatorTabl
 import org.apache.calcite.sql.{SqlFunction, SqlOperator, SqlOperatorTable}
 import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.expressions._
-import org.apache.flink.table.functions.{EventTimeExtractor, RowTime, ScalarFunction, TableFunction}
-import org.apache.flink.table.functions.utils.{TableSqlFunction, ScalarSqlFunction}
+import org.apache.flink.table.functions.utils.{ScalarSqlFunction, TableSqlFunction}
+import org.apache.flink.table.functions.{EventTimeExtractor, RowTime, ScalarFunction, TableFunction, _}
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable
@@ -197,7 +197,8 @@ object FunctionCatalog {
     // "ceil" -> classOf[TemporalCeil]
 
     // extensions to support streaming query
-    "rowtime" -> classOf[RowTime]
+    "rowtime" -> classOf[RowTime],
+    "proctime" -> classOf[ProcTime]
   )
 
   /**
@@ -322,7 +323,8 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
     SqlStdOperatorTable.SCALAR_QUERY,
     SqlStdOperatorTable.EXISTS,
     // EXTENSIONS
-    EventTimeExtractor
+    EventTimeExtractor,
+    ProcTimeExtractor
   )
 
   builtInSqlOperators.foreach(register)

http://git-wip-us.apache.org/repos/asf/flink/blob/a755de27/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index 183b84c..06088ab 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.api.scala.stream.sql
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.TableException
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.plan.logical.EventTimeTumblingGroupWindow
+import org.apache.flink.table.plan.logical.{EventTimeTumblingGroupWindow, ProcessingTimeTumblingGroupWindow}
 import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
 import org.apache.flink.table.utils.TableTestUtil._
 import org.junit.Test
@@ -94,6 +94,27 @@ class WindowAggregateTest extends TableTestBase {
     streamUtil.verifySql(sql, expected)
   }
 
+  @Test
+  def testProcessingTime() = {
+    val sql = "SELECT COUNT(*) FROM MyTable GROUP BY FLOOR(proctime() TO HOUR)"
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "1970-01-01 00:00:00 AS $f0")
+          ),
+          term("window", ProcessingTimeTumblingGroupWindow(None, 3600000.millis)),
+          term("select", "COUNT(*) AS EXPR$0")
+        ),
+        term("select", "EXPR$0")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
   @Test(expected = classOf[TableException])
   def testMultiWindow() = {
     val sql = "SELECT COUNT(*) FROM MyTable GROUP BY " +