You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/12/05 11:02:14 UTC

[GitHub] XuQianJin-Stars closed pull request #7228: [FLINK-9740][Table API &SQL] Support group windows over intervals of months

XuQianJin-Stars closed pull request #7228: [FLINK-9740][Table API &SQL] Support group windows over intervals of months
URL: https://github.com/apache/flink/pull/7228
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
index 6080ba49437..96139535c8e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
@@ -44,6 +44,11 @@ object ExpressionUtils {
     case _ => false
   }
 
+  private[flink] def isMonthsIntervalLiteral(expr: Expression): Boolean = expr match {
+    case Literal(_, TimeIntervalTypeInfo.INTERVAL_MONTHS) => true
+    case _ => false
+  }
+
   private[flink] def isRowCountLiteral(expr: Expression): Boolean = expr match {
     case Literal(_, RowIntervalTypeInfo.INTERVAL_ROWS) => true
     case _ => false
@@ -71,6 +76,12 @@ object ExpressionUtils {
     case _ => throw new IllegalArgumentException()
   }
 
+  private[flink] def toMonths(expr: Expression): FlinkTime = expr match {
+    case Literal(value: Long, TimeIntervalTypeInfo.INTERVAL_MONTHS) =>
+      FlinkTime.months(value)
+    case _ => throw new IllegalArgumentException()
+  }
+
   private[flink] def toLong(expr: Expression): Long = expr match {
     case Literal(value: Long, RowIntervalTypeInfo.INTERVAL_ROWS) => value
     case _ => throw new IllegalArgumentException()
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
index 78a7273f6af..734bb551562 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
@@ -264,6 +264,10 @@ object DataStreamGroupWindowAggregate {
         if isRowtimeAttribute(timeField) && isTimeIntervalLiteral(size) =>
       stream.window(TumblingEventTimeWindows.of(toTime(size)))
 
+    case TumblingGroupWindow(_, timeField, size)
+      if isRowtimeAttribute(timeField) && isMonthsIntervalLiteral(size) =>
+      stream.window(TumblingEventTimeWindows.of(toMonths(size)))
+
     case TumblingGroupWindow(_, _, size) =>
       // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
       // before applying the  windowing logic. Otherwise, this would be the same as a
@@ -284,6 +288,10 @@ object DataStreamGroupWindowAggregate {
         if isRowtimeAttribute(timeField) && isTimeIntervalLiteral(size)=>
       stream.window(SlidingEventTimeWindows.of(toTime(size), toTime(slide)))
 
+    case SlidingGroupWindow(_, timeField, size, slide)
+      if isRowtimeAttribute(timeField) && isMonthsIntervalLiteral(size) =>
+      stream.window(SlidingEventTimeWindows.of(toMonths(size), toMonths(slide)))
+
     case SlidingGroupWindow(_, _, size, slide) =>
       // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
       // before applying the  windowing logic. Otherwise, this would be the same as a
@@ -292,9 +300,13 @@ object DataStreamGroupWindowAggregate {
         "Event-time grouping windows on row intervals are currently not supported.")
 
     case SessionGroupWindow(_, timeField, gap)
-        if isProctimeAttribute(timeField) =>
+      if isProctimeAttribute(timeField) =>
       stream.window(ProcessingTimeSessionWindows.withGap(toTime(gap)))
 
+    case SessionGroupWindow(_, timeField, gap)
+      if isRowtimeAttribute(timeField) && isMonthsIntervalLiteral(gap) =>
+      stream.window(EventTimeSessionWindows.withGap(toMonths(gap)))
+
     case SessionGroupWindow(_, timeField, gap)
         if isRowtimeAttribute(timeField) =>
       stream.window(EventTimeSessionWindows.withGap(toTime(gap)))
@@ -318,6 +330,9 @@ object DataStreamGroupWindowAggregate {
     case TumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) =>
       stream.windowAll(TumblingEventTimeWindows.of(toTime(size)))
 
+    case TumblingGroupWindow(_, _, size) if isMonthsIntervalLiteral(size) =>
+      stream.windowAll(TumblingEventTimeWindows.of(toMonths(size)))
+
     case TumblingGroupWindow(_, _, size) =>
       // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
       // before applying the  windowing logic. Otherwise, this would be the same as a
@@ -338,6 +353,10 @@ object DataStreamGroupWindowAggregate {
         if isRowtimeAttribute(timeField) && isTimeIntervalLiteral(size)=>
       stream.windowAll(SlidingEventTimeWindows.of(toTime(size), toTime(slide)))
 
+    case SlidingGroupWindow(_, timeField, size, slide)
+      if isRowtimeAttribute(timeField) && isMonthsIntervalLiteral(size)=>
+      stream.windowAll(SlidingEventTimeWindows.of(toMonths(size), toMonths(slide)))
+
     case SlidingGroupWindow(_, _, size, slide) =>
       // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
       // before applying the  windowing logic. Otherwise, this would be the same as a
@@ -349,6 +368,10 @@ object DataStreamGroupWindowAggregate {
         if isProctimeAttribute(timeField) && isTimeIntervalLiteral(gap) =>
       stream.windowAll(ProcessingTimeSessionWindows.withGap(toTime(gap)))
 
+    case SessionGroupWindow(_, timeField, gap)
+      if isProctimeAttribute(timeField) && isMonthsIntervalLiteral(gap) =>
+      stream.windowAll(ProcessingTimeSessionWindows.withGap(toMonths(gap)))
+
     case SessionGroupWindow(_, timeField, gap)
         if isRowtimeAttribute(timeField) && isTimeIntervalLiteral(gap) =>
       stream.windowAll(EventTimeSessionWindows.withGap(toTime(gap)))
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
index c8c58adf226..db8a8ebb2ea 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
@@ -70,8 +70,10 @@ class DataStreamLogicalWindowAggregateRule
       call.getOperands.get(idx) match {
         case v: RexLiteral if v.getTypeName.getFamily == SqlTypeFamily.INTERVAL_DAY_TIME =>
           v.getValue.asInstanceOf[JBigDecimal].longValue()
+        case v: RexLiteral if v.getTypeName.getFamily == SqlTypeFamily.INTERVAL_YEAR_MONTH =>
+          v.getValue.asInstanceOf[JBigDecimal].longValue()
         case _ => throw new TableException(
-          "Only constant window intervals with millisecond resolution are supported.")
+          "Only constant window intervals with millisecond and months resolution are supported.")
       }
 
     def getOperandAsTimeIndicator(call: RexCall, idx: Int): ResolvedFieldReference =
@@ -84,27 +86,37 @@ class DataStreamLogicalWindowAggregateRule
           throw new ValidationException("Window can only be defined over a time attribute column.")
       }
 
+    def getOperandAsLiteral(call: RexCall, idx: Int): Literal =
+      call.getOperands.get(idx) match {
+        case v: RexLiteral if v.getTypeName.getFamily == SqlTypeFamily.INTERVAL_DAY_TIME =>
+          Literal(getOperandAsLong(call, idx), TimeIntervalTypeInfo.INTERVAL_MILLIS)
+        case v: RexLiteral if v.getTypeName.getFamily == SqlTypeFamily.INTERVAL_YEAR_MONTH =>
+          Literal(getOperandAsLong(call, idx), TimeIntervalTypeInfo.INTERVAL_MONTHS)
+        case _ => throw new TableException(
+          "Only constant window intervals with millisecond and months resolution are supported.")
+      }
+
     windowExpr.getOperator match {
       case BasicOperatorTable.TUMBLE =>
         val time = getOperandAsTimeIndicator(windowExpr, 0)
-        val interval = getOperandAsLong(windowExpr, 1)
-        val w = Tumble.over(Literal(interval, TimeIntervalTypeInfo.INTERVAL_MILLIS))
-
+        val tumbleLiteral = getOperandAsLiteral(windowExpr, 1)
+        val w = Tumble.over(tumbleLiteral)
         w.on(time).as(WindowReference("w$", Some(time.resultType)))
 
       case BasicOperatorTable.HOP =>
         val time = getOperandAsTimeIndicator(windowExpr, 0)
-        val (slide, size) = (getOperandAsLong(windowExpr, 1), getOperandAsLong(windowExpr, 2))
+        val (slideLiteral, sizeLiteral) = (getOperandAsLiteral(windowExpr, 1),
+          getOperandAsLiteral(windowExpr, 2))
         val w = Slide
-          .over(Literal(size, TimeIntervalTypeInfo.INTERVAL_MILLIS))
-          .every(Literal(slide, TimeIntervalTypeInfo.INTERVAL_MILLIS))
+          .over(sizeLiteral)
+          .every(slideLiteral)
 
         w.on(time).as(WindowReference("w$", Some(time.resultType)))
 
       case BasicOperatorTable.SESSION =>
         val time = getOperandAsTimeIndicator(windowExpr, 0)
-        val gap = getOperandAsLong(windowExpr, 1)
-        val w = Session.withGap(Literal(gap, TimeIntervalTypeInfo.INTERVAL_MILLIS))
+        val gapLiteral = getOperandAsLiteral(windowExpr, 1)
+        val w = Session.withGap(gapLiteral)
 
         w.on(time).as(WindowReference("w$", Some(time.resultType)))
     }
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala
index cfbab5c074c..8fe9f17ae83 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala
@@ -19,11 +19,18 @@
 package org.apache.flink.table.api.stream.sql
 
 import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.plan.logical._
 import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvgWithMerge
+import org.apache.flink.table.runtime.utils.StreamITCase
 import org.apache.flink.table.utils.TableTestUtil._
 import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.apache.flink.types.Row
 import org.junit.Test
 
 class GroupWindowTest extends TableTestBase {
@@ -306,4 +313,51 @@ class GroupWindowTest extends TableTestBase {
       )
     streamUtil.verifySql(sql, expected)
   }
+
+  def RunWindowForMonthsInterval(groupWindow: String, interval: String): Unit = {
+    val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
+    execEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    execEnv.setParallelism(1)
+    val tableEnv = TableEnvironment.getTableEnvironment(execEnv)
+    StreamITCase.clear
+
+    val table = tableEnv.sqlQuery("SELECT * FROM (VALUES " +
+      "('Bob', 1543052856000), " +
+      "('Bob', 1543052857000), " +
+      "('Lucy', 1543054856000)" +
+      ") AS NameTable(name,create_time)")
+
+    var inputStream: DataStream[Row] = tableEnv.toAppendStream(table)
+    inputStream.javaStream.getTransformation.setOutputType(table.getSchema.toRowType)
+    inputStream = inputStream.assignTimestampsAndWatermarks(
+      new AssignerWithPeriodicWatermarks[Row]() {
+        override def getCurrentWatermark = new Watermark(System.nanoTime)
+
+        def extractTimestamp(element: Row, previousElementTimestamp: Long): Long =
+          element.getField(1).asInstanceOf[Long]
+      })
+
+    val tb1 = tableEnv.fromDataStream(inputStream, 'name, 'create_time, 'rowtime.rowtime)
+
+    val groupWhereSql = groupWindow match {
+      case "TUMBLE" => s" GROUP BY TUMBLE(rowtime, $interval),name"
+      case "HOP" => s" GROUP BY HOP(rowtime, $interval),name"
+      case "SESSION" => s" GROUP BY SESSION(rowtime, $interval),name"
+    }
+
+    val result = tableEnv.sqlQuery(s"select name,count(1) from $tb1 " + groupWhereSql)
+    result.addSink(new StreamITCase.StringSink[Row])
+
+    execEnv.execute()
+  }
+
+  @Test
+  def TestWindowForMonthsInterval(): Unit = {
+    RunWindowForMonthsInterval("TUMBLE", "interval '10' month")
+    RunWindowForMonthsInterval("TUMBLE", "INTERVAL '2-10' YEAR TO MONTH")
+    RunWindowForMonthsInterval("HOP", "INTERVAL '1' MONTH, INTERVAL '10' MONTH")
+    RunWindowForMonthsInterval("HOP", "INTERVAL '1' MONTH, INTERVAL '2-10' YEAR TO MONTH")
+    RunWindowForMonthsInterval("SESSION", "interval '10' month")
+    RunWindowForMonthsInterval("SESSION", "INTERVAL '2-10' YEAR TO MONTH")
+  }
 }
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/WindowAggregateValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/WindowAggregateValidationTest.scala
index 5c237ffc5ee..f2cd087692d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/WindowAggregateValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/WindowAggregateValidationTest.scala
@@ -73,7 +73,7 @@ class WindowAggregateValidationTest extends TableTestBase {
   def testVariableWindowSize(): Unit = {
     expectedException.expect(classOf[TableException])
     expectedException.expectMessage(
-      "Only constant window intervals with millisecond resolution are supported")
+      "Only constant window intervals with millisecond and months resolution are supported.")
 
     val sql = "SELECT COUNT(*) FROM MyTable GROUP BY TUMBLE(proctime, c * INTERVAL '1' MINUTE)"
     streamUtil.verifySql(sql, "n/a")
@@ -93,17 +93,4 @@ class WindowAggregateValidationTest extends TableTestBase {
 
     streamUtil.verifySql(sqlQuery, "n/a")
   }
-
-  @Test
-  def testWindowWrongWindowParameter(): Unit = {
-    expectedException.expect(classOf[TableException])
-    expectedException.expectMessage(
-      "Only constant window intervals with millisecond resolution are supported")
-
-    val sqlQuery =
-      "SELECT COUNT(*) FROM MyTable " +
-        "GROUP BY TUMBLE(proctime, INTERVAL '2-10' YEAR TO MONTH)"
-
-    streamUtil.verifySql(sqlQuery, "n/a")
-  }
 }
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
index 46dde8e0225..a0aa8b4f247 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
@@ -21,12 +21,10 @@ package org.apache.flink.table.runtime.stream.sql
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{TableEnvironment, Types}
+import org.apache.flink.table.api.Types
 import org.apache.flink.table.descriptors.{Rowtime, Schema}
 import org.apache.flink.table.expressions.utils.Func15
 import org.apache.flink.table.runtime.stream.sql.SqlITCase.TimestampAndWatermarkWithOffset
@@ -34,6 +32,9 @@ import org.apache.flink.table.runtime.utils.TimeTestUtil.EventTimeSourceFunction
 import org.apache.flink.table.runtime.utils.{JavaUserDefinedTableFunctions, StreamITCase, StreamTestData, StreamingWithStateTestBase}
 import org.apache.flink.table.utils.{InMemoryTableFactory, MemoryTableSourceSinkUtil}
 import org.apache.flink.types.Row
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.table.api.TableEnvironment
 import org.junit.Assert._
 import org.junit._
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java
index d09928694eb..d8a059133fa 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java
@@ -124,4 +124,11 @@ public static Time hours(long hours) {
 	public static Time days(long days) {
 		return of(days, TimeUnit.DAYS);
 	}
+
+	/**
+	 * Creates a new {@link Time} that represents the given number of months.
+	 */
+	public static Time months(long months) {
+		return of(months, TimeUnit.DAYS);
+	}
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services