You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/06/19 22:18:52 UTC

[5/5] flink git commit: [FLINK-6941] [table] Validate that start and end window properties are not accessed on over windows.

[FLINK-6941] [table] Validate that start and end window properties are not accessed on over windows.

This closes #4137.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b6d14b91
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b6d14b91
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b6d14b91

Branch: refs/heads/release-1.3
Commit: b6d14b9147c8966810a184322920dac7e8ec0ee0
Parents: c7f6d02
Author: sunjincheng121 <su...@gmail.com>
Authored: Mon Jun 19 08:06:15 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Mon Jun 19 23:13:26 2017 +0200

----------------------------------------------------------------------
 .../src/main/scala/org/apache/flink/table/api/table.scala |  7 ++++++-
 .../table/api/scala/stream/table/OverWindowTest.scala     | 10 ++++++++++
 .../org/apache/flink/table/utils/TableTestBase.scala      |  8 ++++++++
 3 files changed, 24 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b6d14b91/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
index 2bcb3d8..abdfa65 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.operators.join.JoinType
 import org.apache.flink.table.calcite.FlinkRelBuilder
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.expressions.{Alias, Asc, Expression, ExpressionParser, Ordering, UnresolvedAlias, UnresolvedFieldReference}
+import org.apache.flink.table.expressions.{Alias, Asc, Expression, ExpressionParser, Ordering, UnresolvedAlias, UnresolvedFieldReference, WindowProperty}
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
 import org.apache.flink.table.plan.ProjectionTranslator._
 import org.apache.flink.table.plan.logical.{Minus, _}
@@ -1004,6 +1004,11 @@ class OverWindowedTable(
       table.logicalPlan,
       table.tableEnv)
 
+    if(fields.exists(_.isInstanceOf[WindowProperty])){
+      throw ValidationException(
+        "Window start and end properties are not available for Over windows.")
+    }
+
     val expandedOverFields = resolveOverWindows(expandedFields, overWindows, table.tableEnv)
 
     new Table(

http://git-wip-us.apache.org/repos/asf/flink/blob/b6d14b91/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala
index 96e5eb5..11c518c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala
@@ -107,6 +107,16 @@ class OverWindowTest extends TableTestBase {
   }
 
   @Test
+  def testAccessesWindowProperties(): Unit = {
+    thrown.expect(classOf[ValidationException])
+    thrown.expectMessage("Window start and end properties are not available for Over windows.")
+
+    table
+    .window(Over orderBy 'rowtime preceding 1.minutes as 'w)
+    .select('c, 'a.count over 'w, 'w.start, 'w.end)
+  }
+
+  @Test
   def testProcTimeBoundedPartitionedRowsOver() = {
     val weightedAvg = new WeightedAvgWithRetract
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b6d14b91/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
index a1b28d3..402a69d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
@@ -33,6 +33,8 @@ import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironm
 import org.apache.flink.table.api.scala.batch.utils.LogicalPlanFormatUtils
 import org.apache.flink.table.functions.AggregateFunction
 import org.junit.Assert.assertEquals
+import org.junit.Rule
+import org.junit.rules.ExpectedException
 import org.mockito.Mockito.{mock, when}
 
 /**
@@ -40,6 +42,12 @@ import org.mockito.Mockito.{mock, when}
   */
 class TableTestBase {
 
+  // used for accurate exception information checking.
+  val expectedException = ExpectedException.none()
+
+  @Rule
+  def thrown = expectedException
+
   def batchTestUtil(): BatchTableTestUtil = {
     BatchTableTestUtil()
   }