You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/07/15 09:24:56 UTC

[11/13] flink git commit: [FLINK-8866] [table] Create unified interfaces to configure and instatiate TableSinks

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala
index bf7b84b..c558057 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.descriptors
 
 import java.util.Optional
 
-import org.apache.flink.table.api.{TableSchema, Types}
+import org.apache.flink.table.api.{TableException, TableSchema, Types}
 import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp}
 import org.apache.flink.table.sources.wmstrategies.PreserveWatermarks
 import org.junit.Assert.{assertEquals, assertTrue}
@@ -78,6 +78,40 @@ class SchemaValidatorTest {
     assertEquals(expectedFormatSchema, formatSchema)
   }
 
+  @Test(expected = classOf[TableException])
+  def testDeriveTableSinkSchemaWithRowtimeFromSource(): Unit = {
+    val desc1 = Schema()
+      .field("otherField", Types.STRING).from("csvField")
+      .field("abcField", Types.STRING)
+      .field("p", Types.SQL_TIMESTAMP).proctime()
+      .field("r", Types.SQL_TIMESTAMP).rowtime(
+        Rowtime().timestampsFromSource().watermarksFromSource())
+    val props = new DescriptorProperties()
+    desc1.addProperties(props)
+
+    SchemaValidator.deriveTableSinkSchema(props)
+  }
+
+  @Test
+  def testDeriveTableSinkSchemaWithRowtimeFromField(): Unit = {
+    val desc1 = Schema()
+      .field("otherField", Types.STRING).from("csvField")
+      .field("abcField", Types.STRING)
+      .field("p", Types.SQL_TIMESTAMP).proctime()
+      .field("r", Types.SQL_TIMESTAMP).rowtime(
+        Rowtime().timestampsFromField("myTime").watermarksFromSource())
+    val props = new DescriptorProperties()
+    desc1.addProperties(props)
+
+    val expectedTableSinkSchema = TableSchema.builder()
+      .field("csvField", Types.STRING) // aliased
+      .field("abcField", Types.STRING)
+      .field("myTime", Types.SQL_TIMESTAMP)
+      .build()
+
+    assertEquals(expectedTableSinkSchema, SchemaValidator.deriveTableSinkSchema(props))
+  }
+
   @Test
   def testSchemaWithRowtimeFromField(): Unit = {
      val desc1 = Schema()

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableSourceDescriptorTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableSourceDescriptorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableSourceDescriptorTest.scala
index 91f3aac..a7dd644 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableSourceDescriptorTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/TableSourceDescriptorTest.scala
@@ -25,6 +25,9 @@ import org.junit.Test
 
 import scala.collection.JavaConverters._
 
+/**
+  * Tests for [[TableSourceDescriptor]].
+  */
 class TableSourceDescriptorTest extends TableTestBase {
 
   @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
index be8278f..07839e6 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
@@ -24,7 +24,7 @@ import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.runtime.utils.TableProgramsCollectionTestBase
 import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.table.utils.MemoryTableSinkUtil
+import org.apache.flink.table.utils.MemoryTableSourceSinkUtil
 import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.types.Row
 import org.junit.Assert.assertEquals
@@ -120,14 +120,14 @@ class TableEnvironmentITCase(
   def testInsertIntoMemoryTable(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)
-    MemoryTableSinkUtil.clear
+    MemoryTableSourceSinkUtil.clear()
 
     val t = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("sourceTable", t)
 
     val fieldNames = Array("d", "e", "f")
     val fieldTypes = tEnv.scan("sourceTable").getSchema.getTypes
-    val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+    val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
     tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
 
     val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
@@ -135,7 +135,7 @@ class TableEnvironmentITCase(
     env.execute()
 
     val expected = List("1,1,Hi", "2,2,Hello", "3,2,Hello world")
-    assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted)
+    assertEquals(expected.sorted, MemoryTableSourceSinkUtil.tableDataStrings.sorted)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala
index 5295e7c..67fde38 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala
@@ -26,7 +26,7 @@ import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.table.runtime.utils.{TableProgramsCollectionTestBase, TableProgramsTestBase}
-import org.apache.flink.table.utils.MemoryTableSinkUtil
+import org.apache.flink.table.utils.MemoryTableSourceSinkUtil
 import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.types.Row
 import org.junit.Assert.assertEquals
@@ -186,14 +186,14 @@ class TableEnvironmentITCase(
   def testInsertIntoMemoryTable(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)
-    MemoryTableSinkUtil.clear
+    MemoryTableSourceSinkUtil.clear()
 
     val t = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("sourceTable", t)
 
     val fieldNames = Array("d", "e", "f")
     val fieldTypes = tEnv.scan("sourceTable").getSchema.getTypes
-    val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+    val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
     tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
 
     tEnv.scan("sourceTable")
@@ -202,7 +202,7 @@ class TableEnvironmentITCase(
     env.execute()
 
     val expected = List("1,1,Hi", "2,2,Hello", "3,2,Hello world")
-    assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted)
+    assertEquals(expected.sorted, MemoryTableSourceSinkUtil.tableDataStrings.sorted)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
index c553ee6..85a9647 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
@@ -35,7 +35,7 @@ import org.apache.flink.table.plan.TimeIndicatorConversionTest.TableFunc
 import org.apache.flink.table.runtime.stream.TimeAttributesITCase.{AtomicTimestampWithEqualWatermark, TestPojo, TimestampWithEqualWatermark, TimestampWithEqualWatermarkPojo}
 import org.apache.flink.table.runtime.utils.JavaPojos.Pojo1
 import org.apache.flink.table.runtime.utils.StreamITCase
-import org.apache.flink.table.utils.{MemoryTableSinkUtil, TestTableSourceWithTime}
+import org.apache.flink.table.utils.{MemoryTableSourceSinkUtil, TestTableSourceWithTime}
 import org.apache.flink.test.util.AbstractTestBase
 import org.apache.flink.types.Row
 import org.junit.Assert._
@@ -183,7 +183,7 @@ class TimeAttributesITCase extends AbstractTestBase {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     val tEnv = TableEnvironment.getTableEnvironment(env)
-    MemoryTableSinkUtil.clear
+    MemoryTableSourceSinkUtil.clear()
 
     val stream = env
       .fromCollection(data)
@@ -191,7 +191,7 @@ class TimeAttributesITCase extends AbstractTestBase {
     stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
       .filter('rowtime.cast(Types.LONG) > 4)
       .select('rowtime, 'rowtime.floor(TimeIntervalUnit.DAY), 'rowtime.ceil(TimeIntervalUnit.DAY))
-      .writeToSink(new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink)
+      .writeToSink(new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink)
 
     env.execute()
 
@@ -199,7 +199,7 @@ class TimeAttributesITCase extends AbstractTestBase {
       "1970-01-01 00:00:00.007,1970-01-01 00:00:00.0,1970-01-02 00:00:00.0",
       "1970-01-01 00:00:00.008,1970-01-01 00:00:00.0,1970-01-02 00:00:00.0",
       "1970-01-01 00:00:00.016,1970-01-01 00:00:00.0,1970-01-02 00:00:00.0")
-    assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted)
+    assertEquals(expected.sorted, MemoryTableSourceSinkUtil.tableDataStrings.sorted)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
index e132349..2228bdd 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
@@ -27,13 +27,14 @@ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.table.api.{TableEnvironment, Types}
 import org.apache.flink.table.api.scala._
+import org.apache.flink.table.descriptors.{DescriptorProperties, Rowtime, Schema}
 import org.apache.flink.table.expressions.utils.SplitUDF
 import org.apache.flink.table.expressions.utils.Func15
 import org.apache.flink.table.runtime.stream.sql.SqlITCase.TimestampAndWatermarkWithOffset
 import org.apache.flink.table.runtime.utils.TimeTestUtil.EventTimeSourceFunction
 import org.apache.flink.table.runtime.utils.{JavaUserDefinedTableFunctions, StreamITCase, StreamTestData, StreamingWithStateTestBase}
 import org.apache.flink.types.Row
-import org.apache.flink.table.utils.MemoryTableSinkUtil
+import org.apache.flink.table.utils.{InMemoryTableFactory, MemoryTableSourceSinkUtil}
 import org.junit.Assert._
 import org.junit._
 
@@ -691,7 +692,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     val tEnv = TableEnvironment.getTableEnvironment(env)
-    MemoryTableSinkUtil.clear
+    MemoryTableSourceSinkUtil.clear()
 
     val t = StreamTestData.getSmall3TupleDataStream(env)
         .assignAscendingTimestamps(x => x._2)
@@ -701,7 +702,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val fieldNames = Array("d", "e", "f", "t")
     val fieldTypes = Array(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP)
       .asInstanceOf[Array[TypeInformation[_]]]
-    val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+    val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
     tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
 
     val sql = "INSERT INTO targetTable SELECT a, b, c, rowtime FROM sourceTable"
@@ -712,7 +713,46 @@ class SqlITCase extends StreamingWithStateTestBase {
       "1,1,Hi,1970-01-01 00:00:00.001",
       "2,2,Hello,1970-01-01 00:00:00.002",
       "3,2,Hello world,1970-01-01 00:00:00.002")
-    assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted)
+    assertEquals(expected.sorted, MemoryTableSourceSinkUtil.tableDataStrings.sorted)
+  }
+
+  @Test
+  def testWriteReadTableSourceSink(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    MemoryTableSourceSinkUtil.clear()
+
+    val desc = Schema()
+      .field("a", Types.INT)
+      .field("e", Types.LONG)
+      .field("f", Types.STRING)
+      .field("t", Types.SQL_TIMESTAMP)
+        .rowtime(Rowtime().timestampsFromField("t").watermarksPeriodicAscending())
+      .field("proctime", Types.SQL_TIMESTAMP).proctime()
+    val props = new DescriptorProperties()
+    desc.addProperties(props)
+
+    val t = StreamTestData.getSmall3TupleDataStream(env)
+      .assignAscendingTimestamps(x => x._2)
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+    tEnv.registerTable("sourceTable", t)
+
+    tEnv.registerTableSource("targetTable",
+      new InMemoryTableFactory(3).createTableSource(props.asMap))
+    tEnv.registerTableSink("targetTable",
+      new InMemoryTableFactory(3).createTableSink(props.asMap))
+
+    tEnv.sqlUpdate("INSERT INTO targetTable SELECT a, b, c, rowtime FROM sourceTable")
+    tEnv.sqlQuery("SELECT a, e, f, t from targetTable")
+      .addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = List(
+      "1,1,Hi,1970-01-01 00:00:00.001",
+      "2,2,Hello,1970-01-01 00:00:00.002",
+      "3,2,Hello world,1970-01-01 00:00:00.002")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
index 467d9d3..70e59f3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
@@ -35,7 +35,7 @@ import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.{TableEnvironment, TableException, Types}
 import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData}
 import org.apache.flink.table.sinks._
-import org.apache.flink.table.utils.MemoryTableSinkUtil
+import org.apache.flink.table.utils.MemoryTableSourceSinkUtil
 import org.apache.flink.test.util.{AbstractTestBase, TestBaseUtils}
 import org.apache.flink.types.Row
 import org.apache.flink.util.Collector
@@ -54,13 +54,13 @@ class TableSinkITCase extends AbstractTestBase {
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 
     val tEnv = TableEnvironment.getTableEnvironment(env)
-    MemoryTableSinkUtil.clear
+    MemoryTableSourceSinkUtil.clear
 
     val input = StreamTestData.get3TupleDataStream(env)
       .assignAscendingTimestamps(r => r._2)
     val fieldNames = Array("d", "e", "t")
     val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.SQL_TIMESTAMP, Types.LONG)
-    val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+    val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
     tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
 
     input.toTable(tEnv, 'a, 'b, 'c, 't.rowtime)
@@ -75,7 +75,7 @@ class TableSinkITCase extends AbstractTestBase {
       "Comment#14,1970-01-01 00:00:00.006,6",
       "Comment#15,1970-01-01 00:00:00.006,6").mkString("\n")
 
-    TestBaseUtils.compareResultAsText(MemoryTableSinkUtil.results.asJava, expected)
+    TestBaseUtils.compareResultAsText(MemoryTableSourceSinkUtil.tableData.asJava, expected)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala
deleted file mode 100644
index e70c4f1..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources
-
-import org.apache.flink.table.api.{NoMatchingTableSourceException, TableException, ValidationException}
-import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION, CONNECTOR_TYPE}
-import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_PROPERTY_VERSION, FORMAT_TYPE}
-import org.junit.Assert.assertTrue
-import org.junit.Test
-
-import scala.collection.mutable
-
-class TableSourceFactoryServiceTest {
-
-  @Test
-  def testValidProperties(): Unit = {
-    val props = properties()
-    props.put(CONNECTOR_TYPE, "fixed")
-    props.put(FORMAT_TYPE, "test")
-    assertTrue(TableSourceFactoryService.findAndCreateTableSource(props.toMap) != null)
-  }
-
-  @Test(expected = classOf[NoMatchingTableSourceException])
-  def testInvalidContext(): Unit = {
-    val props = properties()
-    props.put(CONNECTOR_TYPE, "FAIL")
-    props.put(FORMAT_TYPE, "test")
-    TableSourceFactoryService.findAndCreateTableSource(props.toMap)
-  }
-
-  @Test
-  def testDifferentContextVersion(): Unit = {
-    val props = properties()
-    props.put(CONNECTOR_TYPE, "fixed")
-    props.put(FORMAT_TYPE, "test")
-    props.put(CONNECTOR_PROPERTY_VERSION, "2")
-    // the table source should still be found
-    assertTrue(TableSourceFactoryService.findAndCreateTableSource(props.toMap) != null)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testUnsupportedProperty(): Unit = {
-    val props = properties()
-    props.put(CONNECTOR_TYPE, "fixed")
-    props.put(FORMAT_TYPE, "test")
-    props.put("format.path_new", "/new/path")
-    TableSourceFactoryService.findAndCreateTableSource(props.toMap)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testFailingFactory(): Unit = {
-    val props = properties()
-    props.put(CONNECTOR_TYPE, "fixed")
-    props.put(FORMAT_TYPE, "test")
-    props.put("failing", "true")
-    TableSourceFactoryService.findAndCreateTableSource(props.toMap)
-  }
-
-  @Test
-  def testWildcardFormat(): Unit = {
-    val props = properties()
-    props.put(CONNECTOR_TYPE, "wildcard")
-    props.put(FORMAT_TYPE, "test")
-    props.put("format.type", "not-test")
-    props.put("format.not-test-property", "wildcard-property")
-    val actualTableSource = TableSourceFactoryService.findAndCreateTableSource(props.toMap)
-    assertTrue(actualTableSource.isInstanceOf[TestWildcardFormatTableSourceFactory])
-  }
-
-  private def properties(): mutable.Map[String, String] = {
-    val properties = mutable.Map[String, String]()
-    properties.put(CONNECTOR_PROPERTY_VERSION, "1")
-    properties.put(FORMAT_PROPERTY_VERSION, "1")
-    properties.put("format.path", "/path/to/target")
-    properties.put("schema.0.name", "a")
-    properties.put("schema.1.name", "b")
-    properties.put("schema.2.name", "c")
-    properties.put("schema.0.field.0.name", "a")
-    properties.put("schema.0.field.1.name", "b")
-    properties.put("schema.0.field.2.name", "c")
-    properties.put("failing", "false")
-    properties
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestFixedFormatTableFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestFixedFormatTableFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestFixedFormatTableFactory.scala
deleted file mode 100644
index 6a8041b..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestFixedFormatTableFactory.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources
-
-import java.util
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.TableSchema
-import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION, CONNECTOR_TYPE}
-import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_PROPERTY_VERSION, FORMAT_TYPE}
-import org.apache.flink.types.Row
-
-/**
-  * Table source factory for testing with a fixed format.
-  */
-class TestFixedFormatTableFactory extends TableSourceFactory[Row] {
-
-  override def requiredContext(): util.Map[String, String] = {
-    val context = new util.HashMap[String, String]()
-    context.put(CONNECTOR_TYPE, "fixed")
-    context.put(FORMAT_TYPE, "test")
-    context.put(CONNECTOR_PROPERTY_VERSION, "1")
-    context.put(FORMAT_PROPERTY_VERSION, "1")
-    context
-  }
-
-  override def supportedProperties(): util.List[String] = {
-    val properties = new util.ArrayList[String]()
-    properties.add("format.path")
-    properties.add("schema.#.name")
-    properties.add("schema.#.field.#.name")
-    properties.add("failing")
-    properties
-  }
-
-  override def create(properties: util.Map[String, String]): TableSource[Row] = {
-    if (properties.get("failing") == "true") {
-      throw new IllegalArgumentException("Error in this factory.")
-    }
-    new TableSource[Row] {
-      override def getTableSchema: TableSchema = throw new UnsupportedOperationException()
-
-      override def getReturnType: TypeInformation[Row] = throw new UnsupportedOperationException()
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestWildcardFormatTableSourceFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestWildcardFormatTableSourceFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestWildcardFormatTableSourceFactory.scala
deleted file mode 100644
index 91598a6..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestWildcardFormatTableSourceFactory.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources
-
-import java.util
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.TableSchema
-import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION, CONNECTOR_TYPE}
-import org.apache.flink.types.Row
-
-/**
-  * Table source factory for testing with a wildcard format ("format.*").
-  */
-class TestWildcardFormatTableSourceFactory extends TableSourceFactory[Row] with TableSource[Row] {
-
-  override def requiredContext(): util.Map[String, String] = {
-    val context = new util.HashMap[String, String]()
-    context.put(CONNECTOR_TYPE, "wildcard")
-    context.put(CONNECTOR_PROPERTY_VERSION, "1")
-    context
-  }
-
-  override def supportedProperties(): util.List[String] = {
-    val properties = new util.ArrayList[String]()
-    properties.add("format.*")
-    properties.add("schema.#.name")
-    properties.add("schema.#.field.#.name")
-    properties.add("failing")
-    properties
-  }
-
-  override def create(properties: util.Map[String, String]): TableSource[Row] = {
-    this
-  }
-
-  override def getTableSchema: TableSchema = throw new UnsupportedOperationException()
-
-  override def getReturnType: TypeInformation[Row] = throw new UnsupportedOperationException()
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala
new file mode 100644
index 0000000..3689965
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils
+
+import java.util
+
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.connectors.{DiscoverableTableFactory, TableSinkFactory, TableSourceFactory}
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION, CONNECTOR_TYPE}
+import org.apache.flink.table.descriptors.RowtimeValidator._
+import org.apache.flink.table.descriptors.SchemaValidator._
+import org.apache.flink.table.descriptors.{DescriptorProperties, SchemaValidator}
+import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.table.sources.TableSource
+import org.apache.flink.types.Row
+
+/**
+  * Factory for creating stream table sources and sinks.
+  *
+  * See [[MemoryTableSourceSinkUtil.UnsafeMemoryTableSource]] and
+  * [[MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink]].
+  *
+  * @param terminationCount determines when to shutdown the streaming source function
+  */
+class InMemoryTableFactory(terminationCount: Int) extends TableSourceFactory[Row]
+  with TableSinkFactory[Row] with DiscoverableTableFactory {
+  override def createTableSink(properties: util.Map[String, String]): TableSink[Row] = {
+    val params: DescriptorProperties = new DescriptorProperties(true)
+    params.putProperties(properties)
+
+    // validate
+    new SchemaValidator(true, true, true).validate(params)
+
+    val tableSchema = SchemaValidator.deriveTableSinkSchema(params)
+
+    new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink()
+      .configure(tableSchema.getColumnNames, tableSchema.getTypes)
+  }
+
+  override def createTableSource(properties: util.Map[String, String]): TableSource[Row] = {
+    val params: DescriptorProperties = new DescriptorProperties(true)
+    params.putProperties(properties)
+
+    // validate
+    new SchemaValidator(true, true, true).validate(params)
+
+    val tableSchema = SchemaValidator.deriveTableSourceSchema(params)
+
+    // proctime
+    val proctimeAttributeOpt = SchemaValidator.deriveProctimeAttribute(params)
+
+    val (names, types) = tableSchema.getColumnNames.zip(tableSchema.getTypes)
+      .filter(_._1 != proctimeAttributeOpt.get()).unzip
+    // rowtime
+    val rowtimeDescriptors = SchemaValidator.deriveRowtimeAttributes(params)
+    new MemoryTableSourceSinkUtil.UnsafeMemoryTableSource(
+      tableSchema,
+      new RowTypeInfo(types, names),
+      rowtimeDescriptors,
+      proctimeAttributeOpt.get(),
+      terminationCount)
+  }
+
+  override def requiredContext(): util.Map[String, String] = {
+    val context: util.Map[String, String] = new util.HashMap[String, String]
+    context.put(CONNECTOR_TYPE, "memory")
+    context.put(CONNECTOR_PROPERTY_VERSION, "1") // backwards compatibility
+
+    context
+  }
+
+  override def supportedProperties(): util.List[String] = {
+    val properties = new util.ArrayList[String]()
+
+    // schema
+    properties.add(SCHEMA + ".#." + SCHEMA_TYPE)
+    properties.add(SCHEMA + ".#." + SCHEMA_NAME)
+    properties.add(SCHEMA + ".#." + SCHEMA_FROM)
+
+    // time attributes
+    properties.add(SCHEMA + ".#." + SCHEMA_PROCTIME)
+    properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_TYPE)
+    properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_FROM)
+    properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_CLASS)
+    properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_SERIALIZED)
+    properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_TYPE)
+    properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_CLASS)
+    properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_SERIALIZED)
+    properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_DELAY)
+
+    properties
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSinkUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSinkUtil.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSinkUtil.scala
deleted file mode 100644
index 6d6307b..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSinkUtil.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.utils
-
-import org.apache.flink.api.common.io.RichOutputFormat
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
-import org.apache.flink.table.sinks.{AppendStreamTableSink, BatchTableSink, TableSinkBase}
-import org.apache.flink.table.util.TableConnectorUtil
-import org.apache.flink.types.Row
-
-import scala.collection.mutable
-
-object MemoryTableSinkUtil {
-  var results: mutable.MutableList[String] = mutable.MutableList.empty[String]
-
-  def clear = {
-    MemoryTableSinkUtil.results.clear()
-  }
-
-  final class UnsafeMemoryAppendTableSink
-    extends TableSinkBase[Row] with BatchTableSink[Row]
-    with AppendStreamTableSink[Row] {
-
-    override def getOutputType: TypeInformation[Row] = {
-      new RowTypeInfo(getFieldTypes, getFieldNames)
-    }
-
-    override protected def copy: TableSinkBase[Row] = {
-      new UnsafeMemoryAppendTableSink
-    }
-
-    override def emitDataSet(dataSet: DataSet[Row]): Unit = {
-      dataSet
-        .output(new MemoryCollectionOutputFormat)
-        .name(TableConnectorUtil.generateRuntimeName(this.getClass, getFieldNames))
-    }
-
-    override def emitDataStream(dataStream: DataStream[Row]): Unit = {
-      dataStream
-        .addSink(new MemoryAppendSink)
-        .name(TableConnectorUtil.generateRuntimeName(this.getClass, getFieldNames))
-    }
-  }
-
-  private class MemoryAppendSink extends RichSinkFunction[Row]() {
-
-    override def invoke(value: Row): Unit = {
-      results.synchronized {
-        results += value.toString
-      }
-    }
-  }
-
-  private class MemoryCollectionOutputFormat extends RichOutputFormat[Row] {
-
-    override def configure(parameters: Configuration): Unit = {}
-
-    override def open(taskNumber: Int, numTasks: Int): Unit = {}
-
-    override def writeRecord(record: Row): Unit = {
-      results.synchronized {
-        results += record.toString
-      }
-    }
-
-    override def close(): Unit = {}
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala
new file mode 100644
index 0000000..cb0ad43
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils
+
+import java.util
+
+import org.apache.flink.api.common.io.RichOutputFormat
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.table.api.TableSchema
+import org.apache.flink.table.sinks.{AppendStreamTableSink, BatchTableSink, TableSinkBase}
+import org.apache.flink.table.sources._
+import org.apache.flink.table.util.TableConnectorUtil
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/**
+  * Utilities to ingest and retrieve results into and from a table program.
+  */
+object MemoryTableSourceSinkUtil {
+
+  val tableData: mutable.ListBuffer[Row] = mutable.ListBuffer[Row]()
+
+  def tableDataStrings: Seq[String] = tableData.map(_.toString)
+
+  def clear(): Unit = {
+    MemoryTableSourceSinkUtil.tableData.clear()
+  }
+
+  class UnsafeMemoryTableSource(
+      tableSchema: TableSchema,
+      returnType: TypeInformation[Row],
+      rowtimeAttributeDescriptor: util.List[RowtimeAttributeDescriptor],
+      proctime: String,
+      val terminationCount: Int)
+    extends BatchTableSource[Row]
+    with StreamTableSource[Row]
+    with DefinedProctimeAttribute
+    with DefinedRowtimeAttributes {
+
+    override def getReturnType: TypeInformation[Row] = returnType
+
+    override def getTableSchema: TableSchema = tableSchema
+
+    override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
+      execEnv.fromCollection(tableData.asJava, returnType)
+    }
+
+    final class InMemorySourceFunction(var count: Int = terminationCount)
+      extends SourceFunction[Row] {
+
+      override def cancel(): Unit = throw new UnsupportedOperationException()
+
+      override def run(ctx: SourceContext[Row]): Unit = {
+        while (count > 0) {
+          tableData.synchronized {
+            if (tableData.nonEmpty) {
+              val r = tableData.remove(0)
+              ctx.collect(r)
+              count -= 1
+            }
+          }
+        }
+      }
+    }
+
+    override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
+      execEnv.addSource(new InMemorySourceFunction, returnType)
+    }
+
+    override def getProctimeAttribute: String = proctime
+
+    override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = {
+      rowtimeAttributeDescriptor
+    }
+  }
+
+  final class UnsafeMemoryAppendTableSink
+    extends TableSinkBase[Row] with BatchTableSink[Row]
+    with AppendStreamTableSink[Row] {
+
+    override def getOutputType: TypeInformation[Row] = {
+      new RowTypeInfo(getFieldTypes, getFieldNames)
+    }
+
+    override protected def copy: TableSinkBase[Row] = {
+      new UnsafeMemoryAppendTableSink
+    }
+
+    override def emitDataSet(dataSet: DataSet[Row]): Unit = {
+      dataSet
+        .output(new MemoryCollectionOutputFormat)
+        .name(TableConnectorUtil.generateRuntimeName(this.getClass, getFieldNames))
+    }
+
+    override def emitDataStream(dataStream: DataStream[Row]): Unit = {
+      dataStream
+        .addSink(new MemoryAppendSink)
+        .name(TableConnectorUtil.generateRuntimeName(this.getClass, getFieldNames))
+    }
+  }
+
+  private class MemoryAppendSink extends RichSinkFunction[Row]() {
+
+    override def invoke(value: Row): Unit = {
+      tableData.synchronized {
+        tableData += Row.copy(value)
+      }
+    }
+  }
+
+  private class MemoryCollectionOutputFormat extends RichOutputFormat[Row] {
+
+    override def configure(parameters: Configuration): Unit = {}
+
+    override def open(taskNumber: Int, numTasks: Int): Unit = {}
+
+    override def writeRecord(record: Row): Unit = {
+      tableData.synchronized {
+        tableData += Row.copy(record)
+      }
+    }
+
+    override def close(): Unit = {}
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
index 308ead2..c25f30f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
@@ -46,6 +46,8 @@ class MockTableEnvironment extends TableEnvironment(new TableConfig) {
       fieldTypes: Array[TypeInformation[_]],
       tableSink: TableSink[_]): Unit = ???
 
+  override def registerTableSink(name: String, tableSink: TableSink[_]): Unit = ???
+
   override protected def createUniqueTableName(): String = ???
 
   override protected def registerTableSourceInternal(name: String, tableSource: TableSource[_])