You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/12 06:11:05 UTC

[4/5] flink git commit: [FLINK-6483] [table] Add materialization of time indicators.

http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala
new file mode 100644
index 0000000..7ac0874
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.calcite
+
+import java.sql.Timestamp
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.calcite.RelTimeIndicatorConverterTest.TableFunc
+import org.apache.flink.table.expressions.{TimeIntervalUnit, WindowReference}
+import org.apache.flink.table.functions.TableFunction
+import org.apache.flink.table.plan.logical.TumblingGroupWindow
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+/**
+  * Tests for [[RelTimeIndicatorConverter]].
+  */
+class RelTimeIndicatorConverterTest extends TableTestBase {
+
+  @Test
+  def testSimpleMaterialization(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int, 'proctime.proctime)
+
+    val result = t
+      .select('rowtime.floor(TimeIntervalUnit.DAY) as 'rowtime, 'long)
+      .filter('long > 0)
+      .select('rowtime)
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      streamTableNode(0),
+      term("select", "FLOOR(TIME_MATERIALIZATION(rowtime)", "FLAG(DAY)) AS rowtime"),
+      term("where", ">(long, 0)")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testSelectAll(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int, 'proctime.proctime)
+
+    val result = t.select('*)
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      streamTableNode(0),
+      term("select", "TIME_MATERIALIZATION(rowtime) AS rowtime", "long", "int",
+        "TIME_MATERIALIZATION(proctime) AS proctime")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testFilteringOnRowtime(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int)
+
+    val result = t
+      .filter('rowtime > "1990-12-02 12:11:11".toTimestamp)
+      .select('rowtime)
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      streamTableNode(0),
+      term("select", "TIME_MATERIALIZATION(rowtime) AS rowtime"),
+      term("where", ">(TIME_MATERIALIZATION(rowtime), 1990-12-02 12:11:11)")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testGroupingOnRowtime(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int, 'proctime.proctime)
+
+    val result = t
+      .groupBy('rowtime)
+      .select('long.count)
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamGroupAggregate",
+        unaryNode(
+          "DataStreamCalc",
+          streamTableNode(0),
+          term("select", "long", "TIME_MATERIALIZATION(rowtime) AS rowtime")
+        ),
+        term("groupBy", "rowtime"),
+        term("select", "rowtime", "COUNT(long) AS TMP_0")
+      ),
+      term("select", "TMP_0")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testGroupingOnProctimeSql(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[(Long, Int)]("MyTable" , 'long, 'int, 'proctime.proctime)
+
+    val result = util.tEnv.sql("SELECT COUNT(long) FROM MyTable GROUP BY proctime")
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamGroupAggregate",
+        unaryNode(
+          "DataStreamCalc",
+          streamTableNode(0),
+          term("select", "TIME_MATERIALIZATION(proctime) AS proctime", "long")
+        ),
+        term("groupBy", "proctime"),
+        term("select", "proctime", "COUNT(long) AS EXPR$0")
+      ),
+      term("select", "EXPR$0")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testAggregationOnRowtime(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int)
+
+    val result = t
+      .groupBy('long)
+      .select('rowtime.min)
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamGroupAggregate",
+        unaryNode(
+          "DataStreamCalc",
+          streamTableNode(0),
+          term("select", "TIME_MATERIALIZATION(rowtime) AS rowtime", "long")
+        ),
+        term("groupBy", "long"),
+        term("select", "long", "MIN(rowtime) AS TMP_0")
+      ),
+      term("select", "TMP_0")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testAggregationOnProctimeSql(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[(Long, Int)]("MyTable" , 'long, 'int, 'proctime.proctime)
+
+    val result = util.tEnv.sql("SELECT MIN(proctime) FROM MyTable GROUP BY long")
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamGroupAggregate",
+        unaryNode(
+          "DataStreamCalc",
+          streamTableNode(0),
+          term("select", "long", "TIME_MATERIALIZATION(proctime) AS proctime")
+        ),
+        term("groupBy", "long"),
+        term("select", "long", "MIN(proctime) AS EXPR$0")
+      ),
+      term("select", "EXPR$0")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testTableFunction(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int, 'proctime.proctime)
+    val func = new TableFunc
+
+    val result = t.join(func('rowtime, 'proctime) as 's).select('rowtime, 'proctime, 's)
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamCorrelate",
+        streamTableNode(0),
+        term("invocation",
+          s"${func.functionIdentifier}(TIME_MATERIALIZATION($$0), TIME_MATERIALIZATION($$3))"),
+        term("function", func),
+        term("rowType", "RecordType(TIMESTAMP(3) rowtime, BIGINT long, INTEGER int, " +
+          "TIMESTAMP(3) proctime, VARCHAR(2147483647) s)"),
+        term("joinType", "INNER")
+      ),
+      term("select",
+        "TIME_MATERIALIZATION(rowtime) AS rowtime",
+        "TIME_MATERIALIZATION(proctime) AS proctime",
+        "s")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testWindow(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int)
+
+    val result = t
+      .window(Tumble over 100.millis on 'rowtime as 'w)
+      .groupBy('w, 'long)
+      .select('w.end as 'rowtime, 'long, 'int.sum)
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamGroupWindowAggregate",
+        streamTableNode(0),
+        term("groupBy", "long"),
+        term(
+          "window",
+          TumblingGroupWindow(
+            WindowReference("w"),
+            'rowtime,
+            100.millis)),
+        term("select", "long", "SUM(int) AS TMP_1", "end(WindowReference(w)) AS TMP_0")
+      ),
+      term("select", "TMP_0 AS rowtime", "long", "TMP_1")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testWindowSql(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[(Long, Long, Int)]("MyTable", 'rowtime.rowtime, 'long, 'int)
+
+    val result = util.tEnv.sql(
+      "SELECT TUMBLE_END(rowtime, INTERVAL '0.1' SECOND) AS `rowtime`, `long`, " +
+        "SUM(`int`) FROM MyTable " +
+        "GROUP BY `long`, TUMBLE(rowtime, INTERVAL '0.1' SECOND)")
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamGroupWindowAggregate",
+        streamTableNode(0),
+        term("groupBy", "long"),
+        term(
+          "window",
+          TumblingGroupWindow(
+            'w$,
+            'rowtime,
+            100.millis)),
+        term("select", "long", "SUM(int) AS EXPR$2", "start('w$) AS w$start", "end('w$) AS w$end")
+      ),
+      term("select", "w$end", "long", "EXPR$2")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testWindowWithAggregationOnRowtimeSql(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[(Long, Long, Int)]("MyTable", 'rowtime.rowtime, 'long, 'int)
+
+    val result = util.tEnv.sql("SELECT MIN(rowtime), long FROM MyTable " +
+      "GROUP BY long, TUMBLE(rowtime, INTERVAL '0.1' SECOND)")
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamGroupWindowAggregate",
+        unaryNode(
+          "DataStreamCalc",
+          streamTableNode(0),
+          term("select", "long", "1970-01-01 00:00:00 AS $f1",
+            "TIME_MATERIALIZATION(rowtime) AS $f2")
+        ),
+        term("groupBy", "long"),
+        term(
+          "window",
+          TumblingGroupWindow(
+            'w$,
+            'rowtime,
+            100.millis)),
+        term("select", "long", "MIN($f2) AS EXPR$0")
+      ),
+      term("select", "EXPR$0", "long")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testUnion(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Long, Long, Int)]("MyTable", 'rowtime.rowtime, 'long, 'int)
+
+    val result = t.unionAll(t).select('rowtime)
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      binaryNode(
+        "DataStreamUnion",
+        streamTableNode(0),
+        streamTableNode(0),
+        term("union all", "rowtime", "long", "int")
+      ),
+      term("select", "TIME_MATERIALIZATION(rowtime) AS rowtime")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+}
+
+object RelTimeIndicatorConverterTest {
+
+  class TableFunc extends TableFunction[String] {
+    val t = new Timestamp(0L)
+    def eval(time1: Long, time2: Timestamp): Unit = {
+      collect(time1.toString + time2.after(t))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
new file mode 100644
index 0000000..7d7088e
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.datastream
+
+import java.math.BigDecimal
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.utils.StreamITCase
+import org.apache.flink.table.api.{TableEnvironment, TableException, Types}
+import org.apache.flink.table.calcite.RelTimeIndicatorConverterTest.TableFunc
+import org.apache.flink.table.expressions.TimeIntervalUnit
+import org.apache.flink.table.runtime.datastream.TimeAttributesITCase.TimestampWithEqualWatermark
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+/**
+  * Tests for access and materialization of time attributes.
+  */
+class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
+
+  val data = List(
+    (1L, 1, 1d, 1f, new BigDecimal("1"), "Hi"),
+    (2L, 2, 2d, 2f, new BigDecimal("2"), "Hallo"),
+    (3L, 2, 2d, 2f, new BigDecimal("2"), "Hello"),
+    (4L, 5, 5d, 5f, new BigDecimal("5"), "Hello"),
+    (7L, 3, 3d, 3f, new BigDecimal("3"), "Hello"),
+    (8L, 3, 3d, 3f, new BigDecimal("3"), "Hello world"),
+    (16L, 4, 4d, 4f, new BigDecimal("4"), "Hello world"))
+
+  @Test(expected = classOf[TableException])
+  def testInvalidTimeCharacteristic(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val stream = env
+      .fromCollection(data)
+      .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+    stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
+  }
+
+  @Test
+  def testCalcMaterialization(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env
+      .fromCollection(data)
+      .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+    val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
+
+    val t = table.select('rowtime.cast(Types.STRING))
+
+    val results = t.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = Seq(
+      "1970-01-01 00:00:00.001",
+      "1970-01-01 00:00:00.002",
+      "1970-01-01 00:00:00.003",
+      "1970-01-01 00:00:00.004",
+      "1970-01-01 00:00:00.007",
+      "1970-01-01 00:00:00.008",
+      "1970-01-01 00:00:00.016")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testCalcMaterialization2(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env
+      .fromCollection(data)
+      .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+    val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
+
+    val t = table
+      .filter('rowtime.cast(Types.LONG) > 4)
+      .select('rowtime, 'rowtime.floor(TimeIntervalUnit.DAY), 'rowtime.ceil(TimeIntervalUnit.DAY))
+
+    val results = t.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = Seq(
+      "1970-01-01 00:00:00.007,1970-01-01 00:00:00.0,1970-01-02 00:00:00.0",
+      "1970-01-01 00:00:00.008,1970-01-01 00:00:00.0,1970-01-02 00:00:00.0",
+      "1970-01-01 00:00:00.016,1970-01-01 00:00:00.0,1970-01-02 00:00:00.0")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testTableFunction(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env
+      .fromCollection(data)
+      .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+    val table = stream.toTable(
+      tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string, 'proctime.proctime)
+    val func = new TableFunc
+
+    val t = table.join(func('rowtime, 'proctime) as 's).select('rowtime, 's)
+
+    val results = t.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = Seq(
+      "1970-01-01 00:00:00.001,1true",
+      "1970-01-01 00:00:00.002,2true",
+      "1970-01-01 00:00:00.003,3true",
+      "1970-01-01 00:00:00.004,4true",
+      "1970-01-01 00:00:00.007,7true",
+      "1970-01-01 00:00:00.008,8true",
+      "1970-01-01 00:00:00.016,16true")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testUnion(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env
+      .fromCollection(data)
+      .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+    val table = stream.toTable(
+      tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
+
+    val t = table.unionAll(table).select('rowtime)
+
+    val results = t.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = Seq(
+      "1970-01-01 00:00:00.001",
+      "1970-01-01 00:00:00.001",
+      "1970-01-01 00:00:00.002",
+      "1970-01-01 00:00:00.002",
+      "1970-01-01 00:00:00.003",
+      "1970-01-01 00:00:00.003",
+      "1970-01-01 00:00:00.004",
+      "1970-01-01 00:00:00.004",
+      "1970-01-01 00:00:00.007",
+      "1970-01-01 00:00:00.007",
+      "1970-01-01 00:00:00.008",
+      "1970-01-01 00:00:00.008",
+      "1970-01-01 00:00:00.016",
+      "1970-01-01 00:00:00.016")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testWindowWithAggregationOnRowtimeSql(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env
+      .fromCollection(data)
+      .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+    val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
+    tEnv.registerTable("MyTable", table)
+
+    val t = tEnv.sql("SELECT COUNT(`rowtime`) FROM MyTable " +
+      "GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND)")
+
+    val results = t.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = Seq(
+      "1",
+      "2",
+      "2",
+      "2"
+    )
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+}
+
+object TimeAttributesITCase {
+  class TimestampWithEqualWatermark
+  extends AssignerWithPunctuatedWatermarks[(Long, Int, Double, Float, BigDecimal, String)] {
+
+    override def checkAndGetNextWatermark(
+        lastElement: (Long, Int, Double, Float, BigDecimal, String),
+        extractedTimestamp: Long)
+      : Watermark = {
+      new Watermark(extractedTimestamp)
+    }
+
+    override def extractTimestamp(
+        element: (Long, Int, Double, Float, BigDecimal, String),
+        previousElementTimestamp: Long): Long = {
+      element._1
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b50ef4b8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
index 0e6d461..65014cd 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
@@ -24,9 +24,11 @@ import org.apache.flink.api.java.{DataSet => JDataSet}
 import org.apache.flink.table.api.{Table, TableEnvironment}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
+import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.table.expressions.Expression
 import org.apache.flink.table.functions.{ScalarFunction, TableFunction}
 import org.apache.flink.streaming.api.datastream.{DataStream => JDataStream}
+import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JStreamExecutionEnvironment}
 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
 import org.apache.flink.table.api.scala.batch.utils.LogicalPlanFormatUtils
 import org.junit.Assert.assertEquals
@@ -174,7 +176,10 @@ case class BatchTableTestUtil() extends TableTestUtil {
 
 case class StreamTableTestUtil() extends TableTestUtil {
 
+  val javaEnv = mock(classOf[JStreamExecutionEnvironment])
+  when(javaEnv.getStreamTimeCharacteristic).thenReturn(TimeCharacteristic.EventTime)
   val env = mock(classOf[StreamExecutionEnvironment])
+  when(env.getWrappedStreamExecutionEnvironment).thenReturn(javaEnv)
   val tEnv = TableEnvironment.getTableEnvironment(env)
 
   def addTable[T: TypeInformation](