You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/06/19 22:18:52 UTC
[5/5] flink git commit: [FLINK-6941] [table] Validate that start and
end window properties are not accessed on over windows.
[FLINK-6941] [table] Validate that start and end window properties are not accessed on over windows.
This closes #4137.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b6d14b91
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b6d14b91
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b6d14b91
Branch: refs/heads/release-1.3
Commit: b6d14b9147c8966810a184322920dac7e8ec0ee0
Parents: c7f6d02
Author: sunjincheng121 <su...@gmail.com>
Authored: Mon Jun 19 08:06:15 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Mon Jun 19 23:13:26 2017 +0200
----------------------------------------------------------------------
.../src/main/scala/org/apache/flink/table/api/table.scala | 7 ++++++-
.../table/api/scala/stream/table/OverWindowTest.scala | 10 ++++++++++
.../org/apache/flink/table/utils/TableTestBase.scala | 8 ++++++++
3 files changed, 24 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b6d14b91/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
index 2bcb3d8..abdfa65 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.operators.join.JoinType
import org.apache.flink.table.calcite.FlinkRelBuilder
import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.expressions.{Alias, Asc, Expression, ExpressionParser, Ordering, UnresolvedAlias, UnresolvedFieldReference}
+import org.apache.flink.table.expressions.{Alias, Asc, Expression, ExpressionParser, Ordering, UnresolvedAlias, UnresolvedFieldReference, WindowProperty}
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
import org.apache.flink.table.plan.ProjectionTranslator._
import org.apache.flink.table.plan.logical.{Minus, _}
@@ -1004,6 +1004,11 @@ class OverWindowedTable(
table.logicalPlan,
table.tableEnv)
+ if(fields.exists(_.isInstanceOf[WindowProperty])){
+ throw ValidationException(
+ "Window start and end properties are not available for Over windows.")
+ }
+
val expandedOverFields = resolveOverWindows(expandedFields, overWindows, table.tableEnv)
new Table(
http://git-wip-us.apache.org/repos/asf/flink/blob/b6d14b91/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala
index 96e5eb5..11c518c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala
@@ -107,6 +107,16 @@ class OverWindowTest extends TableTestBase {
}
@Test
+ def testAccessesWindowProperties(): Unit = {
+ thrown.expect(classOf[ValidationException])
+ thrown.expectMessage("Window start and end properties are not available for Over windows.")
+
+ table
+ .window(Over orderBy 'rowtime preceding 1.minutes as 'w)
+ .select('c, 'a.count over 'w, 'w.start, 'w.end)
+ }
+
+ @Test
def testProcTimeBoundedPartitionedRowsOver() = {
val weightedAvg = new WeightedAvgWithRetract
http://git-wip-us.apache.org/repos/asf/flink/blob/b6d14b91/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
index a1b28d3..402a69d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
@@ -33,6 +33,8 @@ import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironm
import org.apache.flink.table.api.scala.batch.utils.LogicalPlanFormatUtils
import org.apache.flink.table.functions.AggregateFunction
import org.junit.Assert.assertEquals
+import org.junit.Rule
+import org.junit.rules.ExpectedException
import org.mockito.Mockito.{mock, when}
/**
@@ -40,6 +42,12 @@ import org.mockito.Mockito.{mock, when}
*/
class TableTestBase {
+ // used for accurate exception information checking.
+ val expectedException = ExpectedException.none()
+
+ @Rule
+ def thrown = expectedException
+
def batchTestUtil(): BatchTableTestUtil = {
BatchTableTestUtil()
}