You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/12 06:11:05 UTC
[4/5] flink git commit: [FLINK-6483] [table] Add materialization of
time indicators.
http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala
new file mode 100644
index 0000000..7ac0874
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.calcite
+
+import java.sql.Timestamp
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.calcite.RelTimeIndicatorConverterTest.TableFunc
+import org.apache.flink.table.expressions.{TimeIntervalUnit, WindowReference}
+import org.apache.flink.table.functions.TableFunction
+import org.apache.flink.table.plan.logical.TumblingGroupWindow
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+/**
+ * Tests for [[RelTimeIndicatorConverter]].
+ */
+class RelTimeIndicatorConverterTest extends TableTestBase {
+
+ @Test
+ def testSimpleMaterialization(): Unit = {
+ val util = streamTestUtil()
+ val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int, 'proctime.proctime)
+
+ val result = t
+ .select('rowtime.floor(TimeIntervalUnit.DAY) as 'rowtime, 'long)
+ .filter('long > 0)
+ .select('rowtime)
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "FLOOR(TIME_MATERIALIZATION(rowtime)", "FLAG(DAY)) AS rowtime"),
+ term("where", ">(long, 0)")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testSelectAll(): Unit = {
+ val util = streamTestUtil()
+ val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int, 'proctime.proctime)
+
+ val result = t.select('*)
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "TIME_MATERIALIZATION(rowtime) AS rowtime", "long", "int",
+ "TIME_MATERIALIZATION(proctime) AS proctime")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testFilteringOnRowtime(): Unit = {
+ val util = streamTestUtil()
+ val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int)
+
+ val result = t
+ .filter('rowtime > "1990-12-02 12:11:11".toTimestamp)
+ .select('rowtime)
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "TIME_MATERIALIZATION(rowtime) AS rowtime"),
+ term("where", ">(TIME_MATERIALIZATION(rowtime), 1990-12-02 12:11:11)")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testGroupingOnRowtime(): Unit = {
+ val util = streamTestUtil()
+ val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int, 'proctime.proctime)
+
+ val result = t
+ .groupBy('rowtime)
+ .select('long.count)
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamGroupAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "long", "TIME_MATERIALIZATION(rowtime) AS rowtime")
+ ),
+ term("groupBy", "rowtime"),
+ term("select", "rowtime", "COUNT(long) AS TMP_0")
+ ),
+ term("select", "TMP_0")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testGroupingOnProctimeSql(): Unit = {
+ val util = streamTestUtil()
+ util.addTable[(Long, Int)]("MyTable" , 'long, 'int, 'proctime.proctime)
+
+ val result = util.tEnv.sql("SELECT COUNT(long) FROM MyTable GROUP BY proctime")
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamGroupAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "TIME_MATERIALIZATION(proctime) AS proctime", "long")
+ ),
+ term("groupBy", "proctime"),
+ term("select", "proctime", "COUNT(long) AS EXPR$0")
+ ),
+ term("select", "EXPR$0")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testAggregationOnRowtime(): Unit = {
+ val util = streamTestUtil()
+ val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int)
+
+ val result = t
+ .groupBy('long)
+ .select('rowtime.min)
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamGroupAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "TIME_MATERIALIZATION(rowtime) AS rowtime", "long")
+ ),
+ term("groupBy", "long"),
+ term("select", "long", "MIN(rowtime) AS TMP_0")
+ ),
+ term("select", "TMP_0")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testAggregationOnProctimeSql(): Unit = {
+ val util = streamTestUtil()
+ util.addTable[(Long, Int)]("MyTable" , 'long, 'int, 'proctime.proctime)
+
+ val result = util.tEnv.sql("SELECT MIN(proctime) FROM MyTable GROUP BY long")
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamGroupAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "long", "TIME_MATERIALIZATION(proctime) AS proctime")
+ ),
+ term("groupBy", "long"),
+ term("select", "long", "MIN(proctime) AS EXPR$0")
+ ),
+ term("select", "EXPR$0")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testTableFunction(): Unit = {
+ val util = streamTestUtil()
+ val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int, 'proctime.proctime)
+ val func = new TableFunc
+
+ val result = t.join(func('rowtime, 'proctime) as 's).select('rowtime, 'proctime, 's)
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamCorrelate",
+ streamTableNode(0),
+ term("invocation",
+ s"${func.functionIdentifier}(TIME_MATERIALIZATION($$0), TIME_MATERIALIZATION($$3))"),
+ term("function", func),
+ term("rowType", "RecordType(TIMESTAMP(3) rowtime, BIGINT long, INTEGER int, " +
+ "TIMESTAMP(3) proctime, VARCHAR(2147483647) s)"),
+ term("joinType", "INNER")
+ ),
+ term("select",
+ "TIME_MATERIALIZATION(rowtime) AS rowtime",
+ "TIME_MATERIALIZATION(proctime) AS proctime",
+ "s")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testWindow(): Unit = {
+ val util = streamTestUtil()
+ val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int)
+
+ val result = t
+ .window(Tumble over 100.millis on 'rowtime as 'w)
+ .groupBy('w, 'long)
+ .select('w.end as 'rowtime, 'long, 'int.sum)
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamGroupWindowAggregate",
+ streamTableNode(0),
+ term("groupBy", "long"),
+ term(
+ "window",
+ TumblingGroupWindow(
+ WindowReference("w"),
+ 'rowtime,
+ 100.millis)),
+ term("select", "long", "SUM(int) AS TMP_1", "end(WindowReference(w)) AS TMP_0")
+ ),
+ term("select", "TMP_0 AS rowtime", "long", "TMP_1")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testWindowSql(): Unit = {
+ val util = streamTestUtil()
+ util.addTable[(Long, Long, Int)]("MyTable", 'rowtime.rowtime, 'long, 'int)
+
+ val result = util.tEnv.sql(
+ "SELECT TUMBLE_END(rowtime, INTERVAL '0.1' SECOND) AS `rowtime`, `long`, " +
+ "SUM(`int`) FROM MyTable " +
+ "GROUP BY `long`, TUMBLE(rowtime, INTERVAL '0.1' SECOND)")
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamGroupWindowAggregate",
+ streamTableNode(0),
+ term("groupBy", "long"),
+ term(
+ "window",
+ TumblingGroupWindow(
+ 'w$,
+ 'rowtime,
+ 100.millis)),
+ term("select", "long", "SUM(int) AS EXPR$2", "start('w$) AS w$start", "end('w$) AS w$end")
+ ),
+ term("select", "w$end", "long", "EXPR$2")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testWindowWithAggregationOnRowtimeSql(): Unit = {
+ val util = streamTestUtil()
+ util.addTable[(Long, Long, Int)]("MyTable", 'rowtime.rowtime, 'long, 'int)
+
+ val result = util.tEnv.sql("SELECT MIN(rowtime), long FROM MyTable " +
+ "GROUP BY long, TUMBLE(rowtime, INTERVAL '0.1' SECOND)")
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamGroupWindowAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "long", "1970-01-01 00:00:00 AS $f1",
+ "TIME_MATERIALIZATION(rowtime) AS $f2")
+ ),
+ term("groupBy", "long"),
+ term(
+ "window",
+ TumblingGroupWindow(
+ 'w$,
+ 'rowtime,
+ 100.millis)),
+ term("select", "long", "MIN($f2) AS EXPR$0")
+ ),
+ term("select", "EXPR$0", "long")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testUnion(): Unit = {
+ val util = streamTestUtil()
+ val t = util.addTable[(Long, Long, Int)]("MyTable", 'rowtime.rowtime, 'long, 'int)
+
+ val result = t.unionAll(t).select('rowtime)
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ binaryNode(
+ "DataStreamUnion",
+ streamTableNode(0),
+ streamTableNode(0),
+ term("union all", "rowtime", "long", "int")
+ ),
+ term("select", "TIME_MATERIALIZATION(rowtime) AS rowtime")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+}
+
+object RelTimeIndicatorConverterTest {
+
+ class TableFunc extends TableFunction[String] {
+ val t = new Timestamp(0L)
+ def eval(time1: Long, time2: Timestamp): Unit = {
+ collect(time1.toString + time2.after(t))
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
new file mode 100644
index 0000000..7d7088e
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.datastream
+
+import java.math.BigDecimal
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.utils.StreamITCase
+import org.apache.flink.table.api.{TableEnvironment, TableException, Types}
+import org.apache.flink.table.calcite.RelTimeIndicatorConverterTest.TableFunc
+import org.apache.flink.table.expressions.TimeIntervalUnit
+import org.apache.flink.table.runtime.datastream.TimeAttributesITCase.TimestampWithEqualWatermark
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+/**
+ * Tests for access and materialization of time attributes.
+ */
+class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
+
+ val data = List(
+ (1L, 1, 1d, 1f, new BigDecimal("1"), "Hi"),
+ (2L, 2, 2d, 2f, new BigDecimal("2"), "Hallo"),
+ (3L, 2, 2d, 2f, new BigDecimal("2"), "Hello"),
+ (4L, 5, 5d, 5f, new BigDecimal("5"), "Hello"),
+ (7L, 3, 3d, 3f, new BigDecimal("3"), "Hello"),
+ (8L, 3, 3d, 3f, new BigDecimal("3"), "Hello world"),
+ (16L, 4, 4d, 4f, new BigDecimal("4"), "Hello world"))
+
+ @Test(expected = classOf[TableException])
+ def testInvalidTimeCharacteristic(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ val stream = env
+ .fromCollection(data)
+ .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+ stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
+ }
+
+ @Test
+ def testCalcMaterialization(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val stream = env
+ .fromCollection(data)
+ .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+ val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
+
+ val t = table.select('rowtime.cast(Types.STRING))
+
+ val results = t.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = Seq(
+ "1970-01-01 00:00:00.001",
+ "1970-01-01 00:00:00.002",
+ "1970-01-01 00:00:00.003",
+ "1970-01-01 00:00:00.004",
+ "1970-01-01 00:00:00.007",
+ "1970-01-01 00:00:00.008",
+ "1970-01-01 00:00:00.016")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testCalcMaterialization2(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val stream = env
+ .fromCollection(data)
+ .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+ val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
+
+ val t = table
+ .filter('rowtime.cast(Types.LONG) > 4)
+ .select('rowtime, 'rowtime.floor(TimeIntervalUnit.DAY), 'rowtime.ceil(TimeIntervalUnit.DAY))
+
+ val results = t.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = Seq(
+ "1970-01-01 00:00:00.007,1970-01-01 00:00:00.0,1970-01-02 00:00:00.0",
+ "1970-01-01 00:00:00.008,1970-01-01 00:00:00.0,1970-01-02 00:00:00.0",
+ "1970-01-01 00:00:00.016,1970-01-01 00:00:00.0,1970-01-02 00:00:00.0")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testTableFunction(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val stream = env
+ .fromCollection(data)
+ .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+ val table = stream.toTable(
+ tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string, 'proctime.proctime)
+ val func = new TableFunc
+
+ val t = table.join(func('rowtime, 'proctime) as 's).select('rowtime, 's)
+
+ val results = t.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = Seq(
+ "1970-01-01 00:00:00.001,1true",
+ "1970-01-01 00:00:00.002,2true",
+ "1970-01-01 00:00:00.003,3true",
+ "1970-01-01 00:00:00.004,4true",
+ "1970-01-01 00:00:00.007,7true",
+ "1970-01-01 00:00:00.008,8true",
+ "1970-01-01 00:00:00.016,16true")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testUnion(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val stream = env
+ .fromCollection(data)
+ .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+ val table = stream.toTable(
+ tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
+
+ val t = table.unionAll(table).select('rowtime)
+
+ val results = t.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = Seq(
+ "1970-01-01 00:00:00.001",
+ "1970-01-01 00:00:00.001",
+ "1970-01-01 00:00:00.002",
+ "1970-01-01 00:00:00.002",
+ "1970-01-01 00:00:00.003",
+ "1970-01-01 00:00:00.003",
+ "1970-01-01 00:00:00.004",
+ "1970-01-01 00:00:00.004",
+ "1970-01-01 00:00:00.007",
+ "1970-01-01 00:00:00.007",
+ "1970-01-01 00:00:00.008",
+ "1970-01-01 00:00:00.008",
+ "1970-01-01 00:00:00.016",
+ "1970-01-01 00:00:00.016")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testWindowWithAggregationOnRowtimeSql(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val stream = env
+ .fromCollection(data)
+ .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+ val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
+ tEnv.registerTable("MyTable", table)
+
+ val t = tEnv.sql("SELECT COUNT(`rowtime`) FROM MyTable " +
+ "GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND)")
+
+ val results = t.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = Seq(
+ "1",
+ "2",
+ "2",
+ "2"
+ )
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+}
+
+object TimeAttributesITCase {
+ class TimestampWithEqualWatermark
+ extends AssignerWithPunctuatedWatermarks[(Long, Int, Double, Float, BigDecimal, String)] {
+
+ override def checkAndGetNextWatermark(
+ lastElement: (Long, Int, Double, Float, BigDecimal, String),
+ extractedTimestamp: Long)
+ : Watermark = {
+ new Watermark(extractedTimestamp)
+ }
+
+ override def extractTimestamp(
+ element: (Long, Int, Double, Float, BigDecimal, String),
+ previousElementTimestamp: Long): Long = {
+ element._1
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
index 0e6d461..65014cd 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
@@ -24,9 +24,11 @@ import org.apache.flink.api.java.{DataSet => JDataSet}
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.table.api.scala._
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
+import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.table.expressions.Expression
import org.apache.flink.table.functions.{ScalarFunction, TableFunction}
import org.apache.flink.streaming.api.datastream.{DataStream => JDataStream}
+import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JStreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.scala.batch.utils.LogicalPlanFormatUtils
import org.junit.Assert.assertEquals
@@ -174,7 +176,10 @@ case class BatchTableTestUtil() extends TableTestUtil {
case class StreamTableTestUtil() extends TableTestUtil {
+ val javaEnv = mock(classOf[JStreamExecutionEnvironment])
+ when(javaEnv.getStreamTimeCharacteristic).thenReturn(TimeCharacteristic.EventTime)
val env = mock(classOf[StreamExecutionEnvironment])
+ when(env.getWrappedStreamExecutionEnvironment).thenReturn(javaEnv)
val tEnv = TableEnvironment.getTableEnvironment(env)
def addTable[T: TypeInformation](