You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/06/17 15:11:56 UTC

[GitHub] [flink] aljoscha commented on a change in pull request #8739: [FLINK-12853][table-api][table-planner] TableSourceUtils#validateTableSource method to common module

aljoscha commented on a change in pull request #8739: [FLINK-12853][table-api][table-planner] TableSourceUtils#validateTableSource method to common module
URL: https://github.com/apache/flink/pull/8739#discussion_r294347662
 
 

 ##########
 File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala
 ##########
 @@ -128,6 +129,58 @@ class TableSourceValidationTest {
     tEnv.registerTableSource("testTable", ts)
   }
 
+  @Test
+  def testDefinedRowtimeDoesNotExist(): Unit = {
+    expectedException.expect(classOf[ValidationException])
+    expectedException
+      .expectMessage(
+        "Found a rowtime attribute for field 'rowtime' but it does not exist in the Table")
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = StreamTableEnvironment.create(env)
+
+    val schema = new TableSchema(
+      Array("id", "name", "amount"),
+      Array(Types.LONG, Types.STRING, Types.INT))
+
+    val rowType = new RowTypeInfo(
+      Array(Types.LONG, Types.STRING, Types.SQL_TIMESTAMP(), Types.INT)
+        .asInstanceOf[Array[TypeInformation[_]]],
+      Array("id", "name", "rowtime", "amount"))
+
+    val ts = new TestTableSourceWithTime(schema, rowType, Seq[Row](), rowtime = "rowtime")
+
+    // should fail because rowtime field has invalid type
+    tEnv.registerTableSource("testTable", ts)
+  }
+
+  @Test
+  def testDefinedProctimeDoesNotExist(): Unit = {
+    expectedException.expect(classOf[ValidationException])
+    expectedException
+      .expectMessage(
+        "Found a proctime attribute for field 'proctime' but it does not exist in the Table")
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = StreamTableEnvironment.create(env)
+
+    val schema = new TableSchema(
+      Array("id", "name", "amount"),
+      Array(Types.LONG, Types.STRING, Types.INT))
+
+    val rowType = new RowTypeInfo(
+      Array(Types.LONG, Types.STRING, Types.SQL_TIMESTAMP(), Types.INT)
+        .asInstanceOf[Array[TypeInformation[_]]],
+      Array("id", "name", "proctime", "amount"))
+
+    val ts = new TestTableSourceWithTime(schema, rowType, Seq[Row](), proctime = "proctime")
+
+    // should fail because rowtime field has invalid type
 
 Review comment:
   nit: this should be proctime, I guess 😅 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services