You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/01/31 11:47:32 UTC

[1/3] flink git commit: [FLINK-8240] [table] Create unified interfaces to configure and instatiate TableSources

Repository: flink
Updated Branches:
  refs/heads/master a5476cdcd -> 2cb58960e


http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
index 42f0769..050f1a1 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
@@ -380,6 +380,44 @@ class TableSourceTest extends TableTestBase {
     Assert.assertEquals(source1, source2)
   }
 
+// TODO enable this test once we expose the feature through the table environment
+//  @Test
+//  def testCsvTableSourceDescriptor(): Unit = {
+//    val util = streamTestUtil()
+//    val source1 = util.tableEnv
+//      .from(
+//        FileSystem()
+//          .path("/path/to/csv"))
+//      .withFormat(
+//        Csv()
+//          .field("myfield", Types.STRING)
+//          .field("myfield2", Types.INT)
+//          .quoteCharacter(';')
+//          .fieldDelimiter("#")
+//          .lineDelimiter("\r\n")
+//          .commentPrefix("%%")
+//          .ignoreFirstLine()
+//          .ignoreParseErrors())
+//        .withSchema(
+//          Schema()
+//          .field("myfield", Types.STRING)
+//          .field("myfield2", Types.INT))
+//      .toTableSource
+//
+//    val source2 = new CsvTableSource(
+//      "/path/to/csv",
+//      Array("myfield", "myfield2"),
+//      Array(Types.STRING, Types.INT),
+//      "#",
+//      "\r\n",
+//      ';',
+//      true,
+//      "%%",
+//      true)
+//
+//    Assert.assertEquals(source1, source2)
+//  }
+
   @Test
   def testTimeLiteralExpressionPushdown(): Unit = {
     val (tableSource, tableName) = filterableTableSourceTimeTypes

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
index f9920e7..e6d2458 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
@@ -31,6 +31,7 @@ import org.apache.flink.table.calcite.{CalciteConfig, FlinkTypeFactory, FlinkTyp
 import org.apache.flink.table.plan.schema.TableSourceTable
 import org.apache.flink.table.runtime.utils.CommonTestData
 import org.apache.flink.table.sources.CsvTableSource
+import org.apache.flink.table.utils.MockTableEnvironment
 import org.junit.Assert._
 import org.junit.{Before, Test}
 
@@ -48,7 +49,8 @@ class ExternalCatalogSchemaTest {
   def setUp(): Unit = {
     val rootSchemaPlus: SchemaPlus = CalciteSchema.createRootSchema(true, false).plus()
     val catalog = CommonTestData.getInMemoryTestCatalog
-    ExternalCatalogSchema.registerCatalog(rootSchemaPlus, schemaName, catalog)
+    ExternalCatalogSchema.registerCatalog(
+      new MockTableEnvironment, rootSchemaPlus, schemaName, catalog)
     externalCatalogSchema = rootSchemaPlus.getSubSchema("schemaName")
     val typeFactory = new FlinkTypeFactory(new FlinkTypeSystem())
     val prop = new Properties()

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala
index 0744de9..cdacce1 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala
@@ -24,8 +24,9 @@ import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.api.{TableSchema, Types}
-import org.apache.flink.table.plan.schema.{StreamTableSourceTable}
+import org.apache.flink.table.plan.schema.StreamTableSourceTable
 import org.apache.flink.table.sources.StreamTableSource
+import org.apache.flink.table.utils.MockTableEnvironment
 import org.apache.flink.types.Row
 import org.junit.Assert.assertTrue
 import org.junit.{Before, Test}
@@ -41,7 +42,8 @@ class ExternalTableSourceUtilTest {
   def testExternalStreamTable() = {
     val schema = new TableSchema(Array("foo"), Array(BasicTypeInfo.INT_TYPE_INFO))
     val table = ExternalCatalogTable("mock", schema)
-    val tableSource = ExternalTableSourceUtil.fromExternalCatalogTable(table)
+    val tableSource = ExternalTableSourceUtil.fromExternalCatalogTable(
+      new MockTableEnvironment, table)
     assertTrue(tableSource.isInstanceOf[StreamTableSourceTable[_]])
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala
new file mode 100644
index 0000000..15cf13b
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.table.api.{TableSchema, Types, ValidationException}
+import org.junit.Test
+
+class CsvTest extends DescriptorTestBase {
+
+  @Test
+  def testCsv(): Unit = {
+    val desc = Csv()
+      .field("field1", "STRING")
+      .field("field2", Types.SQL_TIMESTAMP)
+      .field("field3", TypeExtractor.createTypeInfo(classOf[Class[_]]))
+      .field("field4", Types.ROW(
+        Array[String]("test", "row"),
+        Array[TypeInformation[_]](Types.INT, Types.STRING)))
+      .lineDelimiter("^")
+    val expected = Seq(
+      "format.type" -> "csv",
+      "format.version" -> "1",
+      "format.fields.0.name" -> "field1",
+      "format.fields.0.type" -> "STRING",
+      "format.fields.1.name" -> "field2",
+      "format.fields.1.type" -> "TIMESTAMP",
+      "format.fields.2.name" -> "field3",
+      "format.fields.2.type" -> "ANY(java.lang.Class)",
+      "format.fields.3.name" -> "field4",
+      "format.fields.3.type" -> "ROW(test INT, row VARCHAR)",
+      "format.line-delimiter" -> "^")
+    verifyProperties(desc, expected)
+  }
+
+  @Test
+  def testCsvTableSchema(): Unit = {
+    val desc = Csv()
+      .schema(new TableSchema(
+        Array[String]("test", "row"),
+        Array[TypeInformation[_]](Types.INT, Types.STRING)))
+      .quoteCharacter('#')
+      .ignoreFirstLine()
+    val expected = Seq(
+      "format.type" -> "csv",
+      "format.version" -> "1",
+      "format.fields.0.name" -> "test",
+      "format.fields.0.type" -> "INT",
+      "format.fields.1.name" -> "row",
+      "format.fields.1.type" -> "VARCHAR",
+      "format.quote-character" -> "#",
+      "format.ignore-first-line" -> "true")
+    verifyProperties(desc, expected)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidType(): Unit = {
+    verifyInvalidProperty("format.fields.0.type", "WHATEVER")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidField(): Unit = {
+    verifyInvalidProperty("format.fields.10.name", "WHATEVER")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidQuoteCharacter(): Unit = {
+    verifyInvalidProperty("format.quote-character", "qq")
+  }
+
+  override def descriptor(): Descriptor = {
+    Csv()
+      .field("field1", "STRING")
+      .field("field2", Types.SQL_TIMESTAMP)
+      .field("field3", TypeExtractor.createTypeInfo(classOf[Class[_]]))
+      .field("field4", Types.ROW(
+        Array[String]("test", "row"),
+        Array[TypeInformation[_]](Types.INT, Types.STRING)))
+      .lineDelimiter("^")
+  }
+
+  override def validator(): DescriptorValidator = {
+    new CsvValidator()
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala
new file mode 100644
index 0000000..3a59c9b
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.junit.Assert.assertEquals
+
+abstract class DescriptorTestBase {
+
+  /**
+    * Returns a valid descriptor.
+    */
+  def descriptor(): Descriptor
+
+  /**
+    * Returns a validator that can validate this descriptor.
+    */
+  def validator(): DescriptorValidator
+
+  def verifyProperties(descriptor: Descriptor, expected: Seq[(String, String)]): Unit = {
+    val normProps = new DescriptorProperties
+    descriptor.addProperties(normProps)
+    assertEquals(expected.toMap, normProps.asMap)
+  }
+
+  def verifyInvalidProperty(property: String, invalidValue: String): Unit = {
+    val properties = new DescriptorProperties
+    descriptor().addProperties(properties)
+    properties.unsafePut(property, invalidValue)
+    validator().validate(properties)
+  }
+
+  def verifyMissingProperty(removeProperty: String): Unit = {
+    val properties = new DescriptorProperties
+    descriptor().addProperties(properties)
+    properties.unsafeRemove(removeProperty)
+    validator().validate(properties)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/FileSystemTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/FileSystemTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/FileSystemTest.scala
new file mode 100644
index 0000000..3452e8d
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/FileSystemTest.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.ValidationException
+import org.junit.Test
+
+class FileSystemTest extends DescriptorTestBase {
+
+  @Test
+  def testFileSystem(): Unit = {
+    val desc = FileSystem().path("/myfile")
+    val expected = Seq(
+      "connector.type" -> "filesystem",
+      "connector.version" -> "1",
+      "connector.path" -> "/myfile")
+    verifyProperties(desc, expected)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidPath(): Unit = {
+    verifyInvalidProperty("connector.path", "")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testMissingPath(): Unit = {
+    verifyMissingProperty("connector.path")
+  }
+
+  override def descriptor(): Descriptor = {
+    FileSystem().path("/myfile")
+  }
+
+  override def validator(): DescriptorValidator = {
+    new FileSystemValidator()
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/JsonTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/JsonTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/JsonTest.scala
new file mode 100644
index 0000000..756ca23
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/JsonTest.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.ValidationException
+import org.junit.Test
+
+class JsonTest extends DescriptorTestBase {
+
+  @Test
+  def testJson(): Unit = {
+    val schema =
+      """
+        |{
+        |    "title": "Person",
+        |    "type": "object",
+        |    "properties": {
+        |        "firstName": {
+        |            "type": "string"
+        |        },
+        |        "lastName": {
+        |            "type": "string"
+        |        },
+        |        "age": {
+        |            "description": "Age in years",
+        |            "type": "integer",
+        |            "minimum": 0
+        |        }
+        |    },
+        |    "required": ["firstName", "lastName"]
+        |}
+        |""".stripMargin
+    val desc = Json()
+      .schema(schema)
+      .failOnMissingField(true)
+    val expected = Seq(
+      "format.type" -> "json",
+      "format.version" -> "1",
+      "format.schema-string" -> schema,
+      "format.fail-on-missing-field" -> "true")
+    verifyProperties(desc, expected)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidMissingField(): Unit = {
+    verifyInvalidProperty("format.fail-on-missing-field", "DDD")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testMissingSchema(): Unit = {
+    verifyMissingProperty("format.schema-string")
+  }
+
+  override def descriptor(): Descriptor = {
+    Json().schema("test")
+  }
+
+  override def validator(): DescriptorValidator = {
+    new JsonValidator()
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/MetadataTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/MetadataTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/MetadataTest.scala
new file mode 100644
index 0000000..a1854ce
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/MetadataTest.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.ValidationException
+import org.junit.Test
+
+class MetadataTest extends DescriptorTestBase {
+
+  @Test
+  def testMetadata(): Unit = {
+    val desc = Metadata()
+      .comment("Some additional comment")
+      .creationTime(123L)
+      .lastAccessTime(12020202L)
+    val expected = Seq(
+      "metadata.comment" -> "Some additional comment",
+      "metadata.creation-time" -> "123",
+      "metadata.last-access-time" -> "12020202"
+    )
+    verifyProperties(desc, expected)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidCreationTime(): Unit = {
+    verifyInvalidProperty("metadata.creation-time", "dfghj")
+  }
+
+  override def descriptor(): Descriptor = {
+    Metadata()
+      .comment("Some additional comment")
+      .creationTime(123L)
+      .lastAccessTime(12020202L)
+  }
+
+  override def validator(): DescriptorValidator = {
+    new MetadataValidator()
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala
new file mode 100644
index 0000000..80050fc
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.descriptors.RowtimeTest.CustomAssigner
+import org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner
+import org.apache.flink.types.Row
+import org.junit.Test
+
+class RowtimeTest extends DescriptorTestBase {
+
+  @Test
+  def testRowtime(): Unit = {
+    val desc = Rowtime()
+      .timestampsFromField("otherField")
+      .watermarksPeriodicBounding(1000L)
+    val expected = Seq(
+      "rowtime.0.version" -> "1",
+      "rowtime.0.timestamps.type" -> "from-field",
+      "rowtime.0.timestamps.from" -> "otherField",
+      "rowtime.0.watermarks.type" -> "periodic-bounding",
+      "rowtime.0.watermarks.delay" -> "1000"
+    )
+    verifyProperties(desc, expected)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidWatermarkType(): Unit = {
+    verifyInvalidProperty("rowtime.0.watermarks.type", "xxx")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testMissingWatermarkClass(): Unit = {
+    verifyMissingProperty("rowtime.0.watermarks.class")
+  }
+
+  override def descriptor(): Descriptor = {
+    Rowtime()
+      .timestampsFromSource()
+      .watermarksFromStrategy(new CustomAssigner())
+  }
+
+  override def validator(): DescriptorValidator = {
+    new RowtimeValidator("rowtime.0.")
+  }
+}
+
+object RowtimeTest {
+
+  class CustomAssigner extends PunctuatedWatermarkAssigner() {
+    override def getWatermark(row: Row, timestamp: Long): Watermark =
+      throw new UnsupportedOperationException()
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaTest.scala
new file mode 100644
index 0000000..f663a96
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaTest.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{Types, ValidationException}
+import org.junit.Test
+
+class SchemaTest extends DescriptorTestBase {
+
+  @Test
+  def testSchema(): Unit = {
+    val desc = Schema()
+      .field("myField", Types.BOOLEAN)
+      .field("otherField", "VARCHAR").from("csvField")
+      .field("p", Types.SQL_TIMESTAMP).proctime()
+      .field("r", Types.SQL_TIMESTAMP).rowtime(
+        Rowtime().timestampsFromSource().watermarksFromSource())
+    val expected = Seq(
+      "schema.version" -> "1",
+      "schema.0.name" -> "myField",
+      "schema.0.type" -> "BOOLEAN",
+      "schema.1.name" -> "otherField",
+      "schema.1.type" -> "VARCHAR",
+      "schema.1.from" -> "csvField",
+      "schema.2.name" -> "p",
+      "schema.2.type" -> "TIMESTAMP",
+      "schema.2.proctime" -> "true",
+      "schema.3.name" -> "r",
+      "schema.3.type" -> "TIMESTAMP",
+      "schema.3.rowtime.0.version" -> "1",
+      "schema.3.rowtime.0.watermarks.type" -> "from-source",
+      "schema.3.rowtime.0.timestamps.type" -> "from-source"
+    )
+    verifyProperties(desc, expected)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidType(): Unit = {
+    verifyInvalidProperty("schema.1.type", "dfghj")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testBothRowtimeAndProctime(): Unit = {
+    verifyInvalidProperty("schema.2.rowtime.0.version", "1")
+    verifyInvalidProperty("schema.2.rowtime.0.watermarks.type", "from-source")
+    verifyInvalidProperty("schema.2.rowtime.0.timestamps.type", "from-source")
+  }
+
+  override def descriptor(): Descriptor = {
+    Schema()
+      .field("myField", Types.BOOLEAN)
+      .field("otherField", "VARCHAR").from("csvField")
+      .field("p", Types.SQL_TIMESTAMP).proctime()
+      .field("r", Types.SQL_TIMESTAMP)
+  }
+
+  override def validator(): DescriptorValidator = {
+    new SchemaValidator(isStreamEnvironment = true)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/StatisticsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/StatisticsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/StatisticsTest.scala
new file mode 100644
index 0000000..3b248b4
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/StatisticsTest.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import _root_.java.util
+
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.plan.stats.{ColumnStats, TableStats}
+import org.junit.Test
+
+class StatisticsTest extends DescriptorTestBase {
+
+  @Test
+  def testStatistics(): Unit = {
+    val desc = Statistics()
+      .rowCount(1000L)
+      .columnStats("a", ColumnStats(1L, 2L, 3.0, 4, 5, 6))
+      .columnAvgLength("b", 42.0)
+      .columnNullCount("a", 300)
+    val expected = Seq(
+      "statistics.version" -> "1",
+      "statistics.row-count" -> "1000",
+      "statistics.columns.0.name" -> "a",
+      "statistics.columns.0.distinct-count" -> "1",
+      "statistics.columns.0.null-count" -> "300",
+      "statistics.columns.0.avg-length" -> "3.0",
+      "statistics.columns.0.max-length" -> "4",
+      "statistics.columns.0.max-value" -> "5",
+      "statistics.columns.0.min-value" -> "6",
+      "statistics.columns.1.name" -> "b",
+      "statistics.columns.1.avg-length" -> "42.0"
+    )
+    verifyProperties(desc, expected)
+  }
+
+  @Test
+  def testStatisticsTableStats(): Unit = {
+    val map = new util.HashMap[String, ColumnStats]()
+    map.put("a", ColumnStats(null, 2L, 3.0, null, 5, 6))
+    val desc = Statistics()
+      .tableStats(TableStats(32L, map))
+    val expected = Seq(
+      "statistics.version" -> "1",
+      "statistics.row-count" -> "32",
+      "statistics.columns.0.name" -> "a",
+      "statistics.columns.0.null-count" -> "2",
+      "statistics.columns.0.avg-length" -> "3.0",
+      "statistics.columns.0.max-value" -> "5",
+      "statistics.columns.0.min-value" -> "6"
+    )
+    verifyProperties(desc, expected)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidRowCount(): Unit = {
+    verifyInvalidProperty("statistics.row-count", "abx")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testMissingName(): Unit = {
+    verifyMissingProperty("statistics.columns.0.name")
+  }
+
+  override def descriptor(): Descriptor = {
+    Statistics()
+      .rowCount(1000L)
+      .columnStats("a", ColumnStats(1L, 2L, 3.0, 4, 5, 6))
+      .columnAvgLength("b", 42.0)
+      .columnNullCount("a", 300)
+  }
+
+  override def validator(): DescriptorValidator = {
+    new StatisticsValidator()
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptorTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptorTest.scala
new file mode 100644
index 0000000..2c9a89c
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptorTest.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.utils.TableTestBase
+
+class StreamTableSourceDescriptorTest extends TableTestBase {
+
+// TODO enable this test once we expose the feature through the table environment
+//  @Test
+//  def testStreamTableSourceDescriptor(): Unit = {
+//    val util = streamTestUtil()
+//    val desc = util.tableEnv
+//      .from(
+//        FileSystem()
+//          .path("/path/to/csv"))
+//      .withFormat(
+//        Csv()
+//          .field("myfield", Types.STRING)
+//          .field("myfield2", Types.INT)
+//          .quoteCharacter(';')
+//          .fieldDelimiter("#")
+//          .lineDelimiter("\r\n")
+//          .commentPrefix("%%")
+//          .ignoreFirstLine()
+//          .ignoreParseErrors())
+//        .withSchema(
+//          Schema()
+//            .field("myfield", Types.STRING)
+//            .field("myfield2", Types.INT)
+//            .field("proctime", Types.SQL_TIMESTAMP).proctime()
+//            .field("rowtime", Types.SQL_TIMESTAMP).rowtime(
+//              Rowtime().timestampsFromSource().watermarksFromSource())
+//        )
+//    val expected = Seq(
+//      "connector.type" -> "filesystem",
+//      "connector.path" -> "/path/to/csv",
+//      "format.type" -> "csv",
+//      "format.fields.0.name" -> "myfield",
+//      "format.fields.0.type" -> "VARCHAR",
+//      "format.fields.1.name" -> "myfield2",
+//      "format.fields.1.type" -> "INT",
+//      "format.quote-character" -> ";",
+//      "format.field-delimiter" -> "#",
+//      "format.line-delimiter" -> "\r\n",
+//      "format.comment-prefix" -> "%%",
+//      "format.ignore-first-line" -> "true",
+//      "format.ignore-parse-errors" -> "true",
+//      "schema.0.name" -> "myfield",
+//      "schema.0.type" -> "VARCHAR",
+//      "schema.1.name" -> "myfield2",
+//      "schema.1.type" -> "INT",
+//      "schema.2.name" -> "proctime",
+//      "schema.2.type" -> "TIMESTAMP",
+//      "schema.2.proctime" -> "true",
+//      "schema.3.name" -> "rowtime",
+//      "schema.3.type" -> "TIMESTAMP",
+//      "schema.3.rowtime.0.timestamps.type" -> "from-source",
+//      "schema.3.rowtime.0.watermarks.type" -> "from-source"
+//    )
+//    verifyProperties(desc, expected)
+//  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala
new file mode 100644
index 0000000..5e9b5a2
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import org.apache.flink.table.api.{NoMatchingTableSourceException, TableException, ValidationException}
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_TYPE, CONNECTOR_VERSION}
+import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_TYPE, FORMAT_VERSION}
+import org.junit.Assert.assertTrue
+import org.junit.Test
+
+import scala.collection.mutable
+
+class TableSourceFactoryServiceTest {
+
+  @Test
+  def testValidProperties(): Unit = {
+    val props = properties()
+    assertTrue(TableSourceFactoryService.findTableSourceFactory(props.toMap) != null)
+  }
+
+  @Test(expected = classOf[NoMatchingTableSourceException])
+  def testInvalidContext(): Unit = {
+    val props = properties()
+    props.put(CONNECTOR_TYPE, "FAIL")
+    TableSourceFactoryService.findTableSourceFactory(props.toMap)
+  }
+
+  @Test
+  def testDifferentContextVersion(): Unit = {
+    val props = properties()
+    props.put(CONNECTOR_VERSION, "2")
+    // the table source should still be found
+    assertTrue(TableSourceFactoryService.findTableSourceFactory(props.toMap) != null)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testUnsupportedProperty(): Unit = {
+    val props = properties()
+    props.put("format.path_new", "/new/path")
+    TableSourceFactoryService.findTableSourceFactory(props.toMap)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testFailingFactory(): Unit = {
+    val props = properties()
+    props.put("failing", "true")
+    TableSourceFactoryService.findTableSourceFactory(props.toMap)
+  }
+
+  private def properties(): mutable.Map[String, String] = {
+    val properties = mutable.Map[String, String]()
+    properties.put(CONNECTOR_TYPE, "test")
+    properties.put(FORMAT_TYPE, "test")
+    properties.put(CONNECTOR_VERSION, "1")
+    properties.put(FORMAT_VERSION, "1")
+    properties.put("format.path", "/path/to/target")
+    properties.put("schema.0.name", "a")
+    properties.put("schema.1.name", "b")
+    properties.put("schema.2.name", "c")
+    properties.put("schema.0.field.0.name", "a")
+    properties.put("schema.0.field.1.name", "b")
+    properties.put("schema.0.field.2.name", "c")
+    properties.put("failing", "false")
+    properties
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestTableSourceFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestTableSourceFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestTableSourceFactory.scala
new file mode 100644
index 0000000..ae75f99
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestTableSourceFactory.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableSchema
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_TYPE, CONNECTOR_VERSION}
+import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_TYPE, FORMAT_VERSION}
+import org.apache.flink.types.Row
+
+class TestTableSourceFactory extends TableSourceFactory[Row] {
+
+  override def requiredContext(): util.Map[String, String] = {
+    val context = new util.HashMap[String, String]()
+    context.put(CONNECTOR_TYPE, "test")
+    context.put(FORMAT_TYPE, "test")
+    context.put(CONNECTOR_VERSION, "1")
+    context.put(FORMAT_VERSION, "1")
+    context
+  }
+
+  override def supportedProperties(): util.List[String] = {
+    val properties = new util.ArrayList[String]()
+    // connector
+    properties.add("format.path")
+    properties.add("schema.#.name")
+    properties.add("schema.#.field.#.name")
+    properties.add("failing")
+    properties
+  }
+
+  override def create(properties: util.Map[String, String]): TableSource[Row] = {
+    if (properties.get("failing") == "true") {
+      throw new IllegalArgumentException("Error in this factory.")
+    }
+    new TableSource[Row] {
+      override def getTableSchema: TableSchema = throw new UnsupportedOperationException()
+
+      override def getReturnType: TypeInformation[Row] = throw new UnsupportedOperationException()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala
new file mode 100644
index 0000000..29d647c
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.runtime.utils.CommonTestData.{NonPojo, Person}
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+/**
+  * Tests for string-based representation of [[TypeInformation]].
+  */
+class TypeStringUtilsTest {
+
+  @Test
+  def testPrimitiveTypes(): Unit = {
+    testReadAndWrite("VARCHAR", Types.STRING)
+    testReadAndWrite("BOOLEAN", Types.BOOLEAN)
+    testReadAndWrite("TINYINT", Types.BYTE)
+    testReadAndWrite("SMALLINT", Types.SHORT)
+    testReadAndWrite("INT", Types.INT)
+    testReadAndWrite("BIGINT", Types.LONG)
+    testReadAndWrite("FLOAT", Types.FLOAT)
+    testReadAndWrite("DOUBLE", Types.DOUBLE)
+    testReadAndWrite("DECIMAL", Types.DECIMAL)
+    testReadAndWrite("DATE", Types.SQL_DATE)
+    testReadAndWrite("TIME", Types.SQL_TIME)
+    testReadAndWrite("TIMESTAMP", Types.SQL_TIMESTAMP)
+
+    // unsupported type information
+    testReadAndWrite(
+      "ANY(java.lang.Void, " +
+        "rO0ABXNyADJvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZWluZm8uQmFzaWNUeXBlSW5mb_oE8IKl" +
+        "ad0GAgAETAAFY2xhenp0ABFMamF2YS9sYW5nL0NsYXNzO0wAD2NvbXBhcmF0b3JDbGFzc3EAfgABWwAXcG9z" +
+        "c2libGVDYXN0VGFyZ2V0VHlwZXN0ABJbTGphdmEvbGFuZy9DbGFzcztMAApzZXJpYWxpemVydAA2TG9yZy9h" +
+        "cGFjaGUvZmxpbmsvYXBpL2NvbW1vbi90eXBldXRpbHMvVHlwZVNlcmlhbGl6ZXI7eHIANG9yZy5hcGFjaGUu" +
+        "ZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb26UjchIurN66wIAAHhwdnIADmphdmEu" +
+        "bGFuZy5Wb2lkAAAAAAAAAAAAAAB4cHB1cgASW0xqYXZhLmxhbmcuQ2xhc3M7qxbXrsvNWpkCAAB4cAAAAABz" +
+        "cgA5b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLlZvaWRTZXJpYWxpemVyAAAA" +
+        "AAAAAAECAAB4cgBCb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLlR5cGVTZXJp" +
+        "YWxpemVyU2luZ2xldG9ueamHqscud0UCAAB4cgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1" +
+        "dGlscy5UeXBlU2VyaWFsaXplcgAAAAAAAAABAgAAeHA)",
+      BasicTypeInfo.VOID_TYPE_INFO)
+  }
+
+  @Test
+  def testWriteComplexTypes(): Unit = {
+    testReadAndWrite(
+      "ROW(f0 DECIMAL, f1 TINYINT)",
+      Types.ROW(Types.DECIMAL, Types.BYTE))
+
+    testReadAndWrite(
+      "ROW(hello DECIMAL, world TINYINT)",
+      Types.ROW(
+        Array[String]("hello", "world"),
+        Array[TypeInformation[_]](Types.DECIMAL, Types.BYTE)))
+
+    testReadAndWrite(
+      "ROW(\"he llo\" DECIMAL, world TINYINT)",
+      Types.ROW(
+        Array[String]("he llo", "world"),
+        Array[TypeInformation[_]](Types.DECIMAL, Types.BYTE)))
+
+    testReadAndWrite(
+      "ROW(\"he         \\nllo\" DECIMAL, world TINYINT)",
+      Types.ROW(
+        Array[String]("he         \nllo", "world"),
+        Array[TypeInformation[_]](Types.DECIMAL, Types.BYTE)))
+
+    testReadAndWrite(
+      "POJO(org.apache.flink.table.runtime.utils.CommonTestData$Person)",
+      TypeExtractor.createTypeInfo(classOf[Person]))
+
+    testReadAndWrite(
+      "ANY(org.apache.flink.table.runtime.utils.CommonTestData$NonPojo)",
+      TypeExtractor.createTypeInfo(classOf[NonPojo]))
+  }
+
+  private def testReadAndWrite(expected: String, tpe: TypeInformation[_]): Unit = {
+    // test write to string
+    assertEquals(expected, TypeStringUtils.writeTypeInfo(tpe))
+
+    // test read from string
+    assertEquals(tpe, TypeStringUtils.readTypeInfo(expected))
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
index ff7c79d..f43bcc6 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
@@ -35,8 +35,6 @@ class MockTableEnvironment extends TableEnvironment(new TableConfig) {
 
   override def sql(query: String): Table = ???
 
-  override def registerTableSource(name: String, tableSource: TableSource[_]): Unit = ???
-
   override protected def getBuiltInNormRuleSet: RuleSet = ???
 
   override protected def getBuiltInPhysicalOptRuleSet: RuleSet = ???
@@ -46,4 +44,9 @@ class MockTableEnvironment extends TableEnvironment(new TableConfig) {
       fieldNames: Array[String],
       fieldTypes: Array[TypeInformation[_]],
       tableSink: TableSink[_]): Unit = ???
+
+  override protected def createUniqueTableName(): String = ???
+
+  override protected def registerTableSourceInternal(name: String, tableSource: TableSource[_])
+    : Unit = ???
 }


[2/3] flink git commit: [FLINK-8240] [table] Create unified interfaces to configure and instatiate TableSources

Posted by tw...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptorValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptorValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptorValidator.scala
new file mode 100644
index 0000000..1aaa399
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptorValidator.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_TYPE, FORMAT_VERSION}
+
+/**
+  * Validator for [[FormatDescriptor]].
+  */
+class FormatDescriptorValidator extends DescriptorValidator {
+
+  override def validate(properties: DescriptorProperties): Unit = {
+    properties.validateString(FORMAT_TYPE, isOptional = false, minLen = 1)
+    properties.validateInt(FORMAT_VERSION, isOptional = true, 0, Integer.MAX_VALUE)
+  }
+}
+
+object FormatDescriptorValidator {
+
+  val FORMAT_TYPE = "format.type"
+  val FORMAT_VERSION = "format.version"
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Json.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Json.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Json.scala
new file mode 100644
index 0000000..cc46d9c
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Json.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.JsonValidator.{FORMAT_FAIL_ON_MISSING_FIELD, FORMAT_SCHEMA_STRING, FORMAT_TYPE_VALUE}
+
+/**
+  * Encoding descriptor for JSON.
+  */
+class Json extends FormatDescriptor(FORMAT_TYPE_VALUE, version = 1) {
+
+  private var failOnMissingField: Option[Boolean] = None
+
+  private var schema: Option[String] = None
+
+  /**
+    * Sets flag whether to fail if a field is missing or not.
+    *
+    * @param failOnMissingField If set to true, the operation fails if there is a missing field.
+    *                           If set to false, a missing field is set to null.
+    * @return The builder.
+    */
+  def failOnMissingField(failOnMissingField: Boolean): Json = {
+    this.failOnMissingField = Some(failOnMissingField)
+    this
+  }
+
+  /**
+    * Sets the JSON schema string with field names and the types according to the JSON schema
+    * specification [[http://json-schema.org/specification.html]]. Required.
+    *
+    * The schema might be nested.
+    *
+    * @param schema JSON schema
+    */
+  def schema(schema: String): Json = {
+    this.schema = Some(schema)
+    this
+  }
+
+  /**
+    * Internal method for format properties conversion.
+    */
+  override protected def addFormatProperties(properties: DescriptorProperties): Unit = {
+    // we distinguish between "schema string" and "schema" to allow parsing of a
+    // schema object in the future (such that the entire JSON schema can be defined in a YAML
+    // file instead of one large string)
+    schema.foreach(properties.putString(FORMAT_SCHEMA_STRING, _))
+    failOnMissingField.foreach(properties.putBoolean(FORMAT_FAIL_ON_MISSING_FIELD, _))
+  }
+}
+
+/**
+  * Encoding descriptor for JSON.
+  */
+object Json {
+
+  /**
+    * Encoding descriptor for JSON.
+    */
+  def apply(): Json = new Json()
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JsonValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JsonValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JsonValidator.scala
new file mode 100644
index 0000000..9f11caf
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JsonValidator.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.JsonValidator.{FORMAT_FAIL_ON_MISSING_FIELD, FORMAT_SCHEMA_STRING}
+
+/**
+  * Validator for [[Json]].
+  */
+class JsonValidator extends FormatDescriptorValidator {
+
+  override def validate(properties: DescriptorProperties): Unit = {
+    super.validate(properties)
+    properties.validateString(FORMAT_SCHEMA_STRING, isOptional = false, minLen = 1)
+    properties.validateBoolean(FORMAT_FAIL_ON_MISSING_FIELD, isOptional = true)
+  }
+}
+
+object JsonValidator {
+
+  val FORMAT_TYPE_VALUE = "json"
+  val FORMAT_SCHEMA_STRING = "format.schema-string"
+  val FORMAT_FAIL_ON_MISSING_FIELD = "format.fail-on-missing-field"
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Metadata.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Metadata.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Metadata.scala
new file mode 100644
index 0000000..211d786
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Metadata.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.MetadataValidator.{METADATA_COMMENT, METADATA_CREATION_TIME, METADATA_LAST_ACCESS_TIME}
+
+/**
+  * Metadata descriptor for adding additional, useful information.
+  */
+class Metadata extends Descriptor {
+
+  protected var comment: Option[String] = None
+  protected var creationTime: Option[Long] = None
+  protected var lastAccessTime: Option[Long] = None
+
+  /**
+    * Sets a comment.
+    *
+    * @param comment the description
+    */
+  def comment(comment: String): Metadata = {
+    this.comment = Some(comment)
+    this
+  }
+
+  /**
+    * Sets a creation time.
+    *
+    * @param time UTC milliseconds timestamp
+    */
+  def creationTime(time: Long): Metadata = {
+    this.creationTime = Some(time)
+    this
+  }
+
+  /**
+    * Sets a last access time.
+    *
+    * @param time UTC milliseconds timestamp
+    */
+  def lastAccessTime(time: Long): Metadata = {
+    this.lastAccessTime = Some(time)
+    this
+  }
+
+  /**
+    * Internal method for properties conversion.
+    */
+  final override def addProperties(properties: DescriptorProperties): Unit = {
+    comment.foreach(c => properties.putString(METADATA_COMMENT, c))
+    creationTime.foreach(t => properties.putLong(METADATA_CREATION_TIME, t))
+    lastAccessTime.foreach(t => properties.putLong(METADATA_LAST_ACCESS_TIME, t))
+  }
+}
+
+/**
+  * Metadata descriptor for adding additional, useful information.
+  */
+object Metadata {
+
+  /**
+    * Metadata descriptor for adding additional, useful information.
+    */
+  def apply(): Metadata = new Metadata()
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/MetadataValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/MetadataValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/MetadataValidator.scala
new file mode 100644
index 0000000..a8d580c
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/MetadataValidator.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.MetadataValidator.{METADATA_COMMENT, METADATA_CREATION_TIME, METADATA_LAST_ACCESS_TIME, METADATA_VERSION}
+
+/**
+  * Validator for [[Metadata]].
+  */
+class MetadataValidator extends DescriptorValidator {
+
+  override def validate(properties: DescriptorProperties): Unit = {
+    properties.validateInt(METADATA_VERSION, isOptional = true, 0, Integer.MAX_VALUE)
+    properties.validateString(METADATA_COMMENT, isOptional = true)
+    properties.validateLong(METADATA_CREATION_TIME, isOptional = true)
+    properties.validateLong(METADATA_LAST_ACCESS_TIME, isOptional = true)
+  }
+}
+
+object MetadataValidator {
+
+  val METADATA_VERSION = "metadata.version"
+  val METADATA_COMMENT = "metadata.comment"
+  val METADATA_CREATION_TIME = "metadata.creation-time"
+  val METADATA_LAST_ACCESS_TIME = "metadata.last-access-time"
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala
new file mode 100644
index 0000000..a1c80f5
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.descriptors.RowtimeValidator.{ROWTIME, ROWTIME_VERSION, normalizeTimestampExtractor, normalizeWatermarkStrategy}
+import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp, TimestampExtractor}
+import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
+
+import scala.collection.mutable
+
+/**
+  * Rowtime descriptor for describing an event time attribute in the schema.
+  */
+class Rowtime extends Descriptor {
+
+  private var timestampExtractor: Option[TimestampExtractor] = None
+  private var watermarkStrategy: Option[WatermarkStrategy] = None
+
+  /**
+    * Sets a built-in timestamp extractor that converts an existing [[Long]] or
+    * [[Types.SQL_TIMESTAMP]] field into the rowtime attribute.
+    *
+    * @param fieldName The field to convert into a rowtime attribute.
+    */
+  def timestampsFromField(fieldName: String): Rowtime = {
+    timestampExtractor = Some(new ExistingField(fieldName))
+    this
+  }
+
+  /**
+    * Sets a built-in timestamp extractor that converts the assigned timestamps from
+    * a DataStream API record into the rowtime attribute and thus preserves the assigned
+    * timestamps from the source.
+    *
+    * Note: This extractor only works in streaming environments.
+    */
+  def timestampsFromSource(): Rowtime = {
+    timestampExtractor = Some(new StreamRecordTimestamp)
+    this
+  }
+
+  /**
+    * Sets a custom timestamp extractor to be used for the rowtime attribute.
+    *
+    * @param extractor The [[TimestampExtractor]] to extract the rowtime attribute
+    *                  from the physical type.
+    */
+  def timestampsFromExtractor(extractor: TimestampExtractor): Rowtime = {
+    timestampExtractor = Some(extractor)
+    this
+  }
+
+  /**
+    * Sets a built-in watermark strategy for ascending rowtime attributes.
+    *
+    * Emits a watermark of the maximum observed timestamp so far minus 1.
+    * Rows that have a timestamp equal to the max timestamp are not late.
+    */
+  def watermarksPeriodicAscending(): Rowtime = {
+    watermarkStrategy = Some(new AscendingTimestamps)
+    this
+  }
+
+  /**
+    * Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded
+    * time interval.
+    *
+    * Emits watermarks which are the maximum observed timestamp minus the specified delay.
+    */
+  def watermarksPeriodicBounding(delay: Long): Rowtime = {
+    watermarkStrategy = Some(new BoundedOutOfOrderTimestamps(delay))
+    this
+  }
+
+  /**
+    * Sets a built-in watermark strategy which indicates the watermarks should be preserved from the
+    * underlying DataStream API and thus preserves the assigned timestamps from the source.
+    */
+  def watermarksFromSource(): Rowtime = {
+    watermarkStrategy = Some(PreserveWatermarks.INSTANCE)
+    this
+  }
+
+  /**
+    * Sets a custom watermark strategy to be used for the rowtime attribute.
+    */
+  def watermarksFromStrategy(strategy: WatermarkStrategy): Rowtime = {
+    watermarkStrategy = Some(strategy)
+    this
+  }
+
+  /**
+    * Internal method for properties conversion.
+    */
+  final override def addProperties(properties: DescriptorProperties): Unit = {
+    val props = mutable.HashMap[String, String]()
+    props.put(ROWTIME_VERSION, "1")
+    timestampExtractor.foreach(normalizeTimestampExtractor(_).foreach(e => props.put(e._1, e._2)))
+    watermarkStrategy.foreach(normalizeWatermarkStrategy(_).foreach(e => props.put(e._1, e._2)))
+
+    // use a list for the rowtime to support multiple rowtime attributes in the future
+    properties.putIndexedVariableProperties(ROWTIME, Seq(props.toMap))
+  }
+}
+
+/**
+  * Rowtime descriptor for describing an event time attribute in the schema.
+  */
+object Rowtime {
+
+  /**
+    * Rowtime descriptor for describing an event time attribute in the schema.
+    */
+  def apply(): Rowtime = new Rowtime()
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
new file mode 100644
index 0000000..74e49f1
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.DescriptorProperties.serialize
+import org.apache.flink.table.descriptors.RowtimeValidator._
+import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp, TimestampExtractor}
+import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
+
+/**
+  * Validator for [[Rowtime]].
+  */
+class RowtimeValidator(val prefix: String = "") extends DescriptorValidator {
+
+  override def validate(properties: DescriptorProperties): Unit = {
+    properties.validateInt(prefix + ROWTIME_VERSION, isOptional = true, 0, Integer.MAX_VALUE)
+
+    val noValidation = () => {}
+
+    val timestampExistingField = () => {
+      properties.validateString(prefix + TIMESTAMPS_FROM, isOptional = false, minLen = 1)
+    }
+
+    val timestampCustom = () => {
+      properties.validateString(prefix + TIMESTAMPS_CLASS, isOptional = false, minLen = 1)
+      properties.validateString(prefix + TIMESTAMPS_SERIALIZED, isOptional = false, minLen = 1)
+    }
+
+    properties.validateEnum(
+      prefix + TIMESTAMPS_TYPE,
+      isOptional = false,
+      Map(
+        TIMESTAMPS_TYPE_VALUE_FROM_FIELD -> timestampExistingField,
+        TIMESTAMPS_TYPE_VALUE_FROM_SOURCE -> noValidation,
+        TIMESTAMPS_TYPE_VALUE_CUSTOM -> timestampCustom
+      )
+    )
+
+    val watermarkPeriodicBounding = () => {
+      properties.validateLong(prefix + WATERMARKS_DELAY, isOptional = false, min = 0)
+    }
+
+    val watermarkCustom = () => {
+      properties.validateString(prefix + WATERMARKS_CLASS, isOptional = false, minLen = 1)
+      properties.validateString(prefix + WATERMARKS_SERIALIZED, isOptional = false, minLen = 1)
+    }
+
+    properties.validateEnum(
+      prefix + WATERMARKS_TYPE,
+      isOptional = false,
+      Map(
+        WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING -> noValidation,
+        WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING -> watermarkPeriodicBounding,
+        WATERMARKS_TYPE_VALUE_FROM_SOURCE -> noValidation,
+        WATERMARKS_TYPE_VALUE_CUSTOM -> watermarkCustom
+      )
+    )
+  }
+}
+
+object RowtimeValidator {
+
+  val ROWTIME = "rowtime"
+
+  // per rowtime properties
+
+  val ROWTIME_VERSION = "version"
+  val TIMESTAMPS_TYPE = "timestamps.type"
+  val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
+  val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
+  val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
+  val TIMESTAMPS_FROM = "timestamps.from"
+  val TIMESTAMPS_CLASS = "timestamps.class"
+  val TIMESTAMPS_SERIALIZED = "timestamps.serialized"
+
+  val WATERMARKS_TYPE = "watermarks.type"
+  val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending"
+  val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
+  val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
+  val WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
+  val WATERMARKS_CLASS = "watermarks.class"
+  val WATERMARKS_SERIALIZED = "watermarks.serialized"
+  val WATERMARKS_DELAY = "watermarks.delay"
+
+  // utilities
+
+  def normalizeTimestampExtractor(extractor: TimestampExtractor): Map[String, String] =
+    extractor match {
+        case existing: ExistingField =>
+          Map(
+            TIMESTAMPS_TYPE -> TIMESTAMPS_TYPE_VALUE_FROM_FIELD,
+            TIMESTAMPS_FROM -> existing.getArgumentFields.apply(0))
+        case _: StreamRecordTimestamp =>
+          Map(TIMESTAMPS_TYPE -> TIMESTAMPS_TYPE_VALUE_FROM_SOURCE)
+        case _: TimestampExtractor =>
+          Map(
+            TIMESTAMPS_TYPE -> TIMESTAMPS_TYPE_VALUE_CUSTOM,
+            TIMESTAMPS_CLASS -> extractor.getClass.getName,
+            TIMESTAMPS_SERIALIZED -> serialize(extractor))
+    }
+
+  def normalizeWatermarkStrategy(strategy: WatermarkStrategy): Map[String, String] =
+    strategy match {
+      case _: AscendingTimestamps =>
+        Map(WATERMARKS_TYPE -> WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING)
+      case bounding: BoundedOutOfOrderTimestamps =>
+        Map(
+          WATERMARKS_TYPE -> WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING,
+          WATERMARKS_DELAY -> bounding.delay.toString)
+      case _: PreserveWatermarks =>
+        Map(WATERMARKS_TYPE -> WATERMARKS_TYPE_VALUE_FROM_SOURCE)
+      case _: WatermarkStrategy =>
+        Map(
+          WATERMARKS_TYPE -> WATERMARKS_TYPE_VALUE_CUSTOM,
+          WATERMARKS_CLASS -> strategy.getClass.getName,
+          WATERMARKS_SERIALIZED -> serialize(strategy))
+    }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala
new file mode 100644
index 0000000..2f3a389
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{TableSchema, ValidationException}
+import org.apache.flink.table.descriptors.DescriptorProperties.{normalizeTableSchema, normalizeTypeInfo}
+import org.apache.flink.table.descriptors.SchemaValidator._
+
+import scala.collection.mutable
+
+/**
+  * Describes a schema of a table.
+  *
+  * Note: Field names are matched by the exact name by default (case sensitive).
+  */
+class Schema extends Descriptor {
+
+  // maps a field name to a list of properties that describe type, origin, and the time attribute
+  private val tableSchema = mutable.LinkedHashMap[String, mutable.LinkedHashMap[String, String]]()
+
+  private var lastField: Option[String] = None
+
+  /**
+    * Sets the schema with field names and the types. Required.
+    *
+    * This method overwrites existing fields added with [[field()]].
+    *
+    * @param schema the table schema
+    */
+  def schema(schema: TableSchema): Schema = {
+    tableSchema.clear()
+    lastField = None
+    normalizeTableSchema(schema).foreach {
+      case (n, t) => field(n, t)
+    }
+    this
+  }
+
+  /**
+    * Adds a field with the field name and the type information. Required.
+    * This method can be called multiple times. The call order of this method defines
+    * also the order of the fields in a row.
+    *
+    * @param fieldName the field name
+    * @param fieldType the type information of the field
+    */
+  def field(fieldName: String, fieldType: TypeInformation[_]): Schema = {
+    field(fieldName, normalizeTypeInfo(fieldType))
+    this
+  }
+
+  /**
+    * Adds a field with the field name and the type string. Required.
+    * This method can be called multiple times. The call order of this method defines
+    * also the order of the fields in a row.
+    *
+    * @param fieldName the field name
+    * @param fieldType the type string of the field
+    */
+  def field(fieldName: String, fieldType: String): Schema = {
+    if (tableSchema.contains(fieldName)) {
+      throw new ValidationException(s"Duplicate field name $fieldName.")
+    }
+
+    val fieldProperties = mutable.LinkedHashMap[String, String]()
+    fieldProperties += (TYPE -> fieldType)
+
+    tableSchema += (fieldName -> fieldProperties)
+
+    lastField = Some(fieldName)
+    this
+  }
+
+  /**
+    * Specifies the origin of the previously defined field. The origin field is defined by a
+    * connector or format.
+    *
+    * E.g. field("myString", Types.STRING).from("CSV_MY_STRING")
+    *
+    * Note: Field names are matched by the exact name by default (case sensitive).
+    */
+  def from(originFieldName: String): Schema = {
+    lastField match {
+      case None => throw new ValidationException("No field previously defined. Use field() before.")
+      case Some(f) =>
+        tableSchema(f) += (FROM -> originFieldName)
+        lastField = None
+    }
+    this
+  }
+
+  /**
+    * Specifies the previously defined field as a processing-time attribute.
+    *
+    * E.g. field("proctime", Types.SQL_TIMESTAMP).proctime()
+    */
+  def proctime(): Schema = {
+    lastField match {
+      case None => throw new ValidationException("No field defined previously. Use field() before.")
+      case Some(f) =>
+        tableSchema(f) += (PROCTIME -> PROCTIME_VALUE_TRUE)
+        lastField = None
+    }
+    this
+  }
+
+  /**
+    * Specifies the previously defined field as an event-time attribute.
+    *
+    * E.g. field("rowtime", Types.SQL_TIMESTAMP).rowtime(...)
+    */
+  def rowtime(rowtime: Rowtime): Schema = {
+    lastField match {
+      case None => throw new ValidationException("No field defined previously. Use field() before.")
+      case Some(f) =>
+        val fieldProperties = new DescriptorProperties()
+        rowtime.addProperties(fieldProperties)
+        tableSchema(f) ++= fieldProperties.asMap
+        lastField = None
+    }
+    this
+  }
+
+  /**
+    * Internal method for properties conversion.
+    */
+  final override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
+    properties.putInt(SCHEMA_VERSION, 1)
+    properties.putIndexedVariableProperties(
+      SCHEMA,
+      tableSchema.toSeq.map { case (name, props) =>
+        Map(NAME -> name) ++ props
+      }
+    )
+  }
+}
+
+/**
+  * Describes a schema of a table.
+  */
+object Schema {
+
+  /**
+    * Describes a schema of a table.
+    */
+  def apply(): Schema = new Schema()
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
new file mode 100644
index 0000000..19c0e41
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME
+import org.apache.flink.table.descriptors.SchemaValidator._
+
+/**
+  * Validator for [[Schema]].
+  */
+class SchemaValidator(isStreamEnvironment: Boolean = true) extends DescriptorValidator {
+
+  override def validate(properties: DescriptorProperties): Unit = {
+    properties.validateInt(SCHEMA_VERSION, isOptional = true, 0, Integer.MAX_VALUE)
+
+    val names = properties.getIndexedProperty(SCHEMA, NAME)
+    val types = properties.getIndexedProperty(SCHEMA, TYPE)
+
+    if (names.isEmpty && types.isEmpty) {
+      throw new ValidationException(s"Could not find the required schema for property '$SCHEMA'.")
+    }
+
+    for (i <- 0 until Math.max(names.size, types.size)) {
+      properties.validateString(s"$SCHEMA.$i.$NAME", isOptional = false, minLen = 1)
+      properties.validateType(s"$SCHEMA.$i.$TYPE", isOptional = false)
+      properties.validateString(s"$SCHEMA.$i.$FROM", isOptional = true, minLen = 1)
+      // either proctime or rowtime
+      val proctime = s"$SCHEMA.$i.$PROCTIME"
+      val rowtime = s"$SCHEMA.$i.$ROWTIME"
+      if (properties.contains(proctime)) {
+        if (!isStreamEnvironment) {
+          throw new ValidationException(
+            s"Property '$proctime' is not allowed in a batch environment.")
+        }
+        // check proctime
+        properties.validateBoolean(proctime, isOptional = false)
+        // no rowtime
+        properties.validatePrefixExclusion(rowtime)
+      } else if (properties.hasPrefix(rowtime)) {
+        // check rowtime
+        val rowtimeValidator = new RowtimeValidator(s"$SCHEMA.$i.")
+        rowtimeValidator.validate(properties)
+        // no proctime
+        properties.validateExclusion(proctime)
+      }
+    }
+  }
+}
+
+object SchemaValidator {
+
+  val SCHEMA = "schema"
+  val SCHEMA_VERSION = "schema.version"
+
+  // per column properties
+
+  val NAME = "name"
+  val TYPE = "type"
+  val PROCTIME = "proctime"
+  val PROCTIME_VALUE_TRUE = "true"
+  val FROM = "from"
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Statistics.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Statistics.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Statistics.scala
new file mode 100644
index 0000000..3037286
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Statistics.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.StatisticsValidator._
+import org.apache.flink.table.plan.stats.{ColumnStats, TableStats}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/**
+  * Statistics descriptor for describing table stats.
+  */
+class Statistics extends Descriptor {
+
+  private var rowCount: Option[Long] = None
+  private val columnStats: mutable.LinkedHashMap[String, mutable.Map[String, String]] =
+    mutable.LinkedHashMap[String, mutable.Map[String, String]]()
+
+  /**
+    * Sets the statistics from a [[TableStats]] instance.
+    *
+    * This method overwrites all existing statistics.
+    *
+    * @param tableStats the table statistics
+    */
+  def tableStats(tableStats: TableStats): Statistics = {
+    rowCount(tableStats.rowCount)
+    columnStats.clear()
+    tableStats.colStats.asScala.foreach { case (col, stats) =>
+        columnStats(col, stats)
+    }
+    this
+  }
+
+  /**
+    * Sets statistics for the overall row count. Required.
+    *
+    * @param rowCount the expected number of rows
+    */
+  def rowCount(rowCount: Long): Statistics = {
+    this.rowCount = Some(rowCount)
+    this
+  }
+
+  /**
+    * Sets statistics for a column. Overwrites all existing statistics for this column.
+    *
+    * @param columnName the column name
+    * @param columnStats expected statistics for the column
+    */
+  def columnStats(columnName: String, columnStats: ColumnStats): Statistics = {
+    val map = mutable.Map(normalizeColumnStats(columnStats).toSeq: _*)
+    this.columnStats.put(columnName, map)
+    this
+  }
+
+  /**
+    * Sets the number of distinct values statistic for the given column.
+    */
+  def columnDistinctCount(columnName: String, ndv: Long): Statistics = {
+    this.columnStats
+      .getOrElseUpdate(columnName, mutable.HashMap())
+      .put(DISTINCT_COUNT, ndv.toString)
+    this
+  }
+
+  /**
+    * Sets the number of null values statistic for the given column.
+    */
+  def columnNullCount(columnName: String, nullCount: Long): Statistics = {
+    this.columnStats
+      .getOrElseUpdate(columnName, mutable.HashMap())
+      .put(NULL_COUNT, nullCount.toString)
+    this
+  }
+
+  /**
+    * Sets the average length statistic for the given column.
+    */
+  def columnAvgLength(columnName: String, avgLen: Double): Statistics = {
+    this.columnStats
+      .getOrElseUpdate(columnName, mutable.HashMap())
+      .put(AVG_LENGTH, avgLen.toString)
+    this
+  }
+
+  /**
+    * Sets the maximum length statistic for the given column.
+    */
+  def columnMaxLength(columnName: String, maxLen: Integer): Statistics = {
+    this.columnStats
+      .getOrElseUpdate(columnName, mutable.HashMap())
+      .put(MAX_LENGTH, maxLen.toString)
+    this
+  }
+
+  /**
+    * Sets the maximum value statistic for the given column.
+    */
+  def columnMaxValue(columnName: String, max: Number): Statistics = {
+    this.columnStats
+      .getOrElseUpdate(columnName, mutable.HashMap())
+      .put(MAX_VALUE, max.toString)
+    this
+  }
+
+  /**
+    * Sets the minimum value statistic for the given column.
+    */
+  def columnMinValue(columnName: String, min: Number): Statistics = {
+    this.columnStats
+      .getOrElseUpdate(columnName, mutable.HashMap())
+      .put(MIN_VALUE, min.toString)
+    this
+  }
+
+  /**
+    * Internal method for properties conversion.
+    */
+  final override def addProperties(properties: DescriptorProperties): Unit = {
+    properties.putInt(STATISTICS_VERSION, 1)
+    rowCount.foreach(rc => properties.putLong(STATISTICS_ROW_COUNT, rc))
+    val namedStats = columnStats.map { case (name, stats) =>
+      // name should not be part of the properties key
+      (stats + (NAME -> name)).toMap
+    }.toSeq
+    properties.putIndexedVariableProperties(STATISTICS_COLUMNS, namedStats)
+  }
+}
+
+/**
+  * Statistics descriptor for describing table stats.
+  */
+object Statistics {
+
+  /**
+    * Statistics descriptor for describing table stats.
+    */
+  def apply(): Statistics = new Statistics()
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StatisticsValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StatisticsValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StatisticsValidator.scala
new file mode 100644
index 0000000..a78e422
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StatisticsValidator.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, STATISTICS_VERSION, validateColumnStats}
+import org.apache.flink.table.plan.stats.ColumnStats
+
+import scala.collection.mutable
+
+/**
+  * Validator for [[FormatDescriptor]].
+  */
+class StatisticsValidator extends DescriptorValidator {
+
+  override def validate(properties: DescriptorProperties): Unit = {
+    properties.validateInt(STATISTICS_VERSION, isOptional = true, 0, Integer.MAX_VALUE)
+    properties.validateLong(STATISTICS_ROW_COUNT, isOptional = true, min = 0)
+    validateColumnStats(properties, STATISTICS_COLUMNS)
+  }
+}
+
+object StatisticsValidator {
+
+  val STATISTICS_VERSION = "statistics.version"
+  val STATISTICS_ROW_COUNT = "statistics.row-count"
+  val STATISTICS_COLUMNS = "statistics.columns"
+
+  // per column properties
+
+  val NAME = "name"
+  val DISTINCT_COUNT = "distinct-count"
+  val NULL_COUNT = "null-count"
+  val AVG_LENGTH = "avg-length"
+  val MAX_LENGTH = "max-length"
+  val MAX_VALUE = "max-value"
+  val MIN_VALUE = "min-value"
+
+  // utilities
+
+  def normalizeColumnStats(columnStats: ColumnStats): Map[String, String] = {
+    val stats = mutable.HashMap[String, String]()
+    if (columnStats.ndv != null) {
+      stats += DISTINCT_COUNT -> columnStats.ndv.toString
+    }
+    if (columnStats.nullCount != null) {
+      stats += NULL_COUNT -> columnStats.nullCount.toString
+    }
+    if (columnStats.avgLen != null) {
+      stats += AVG_LENGTH -> columnStats.avgLen.toString
+    }
+    if (columnStats.maxLen != null) {
+      stats += MAX_LENGTH -> columnStats.maxLen.toString
+    }
+    if (columnStats.max != null) {
+      stats += MAX_VALUE -> columnStats.max.toString
+    }
+    if (columnStats.min != null) {
+      stats += MIN_VALUE -> columnStats.min.toString
+    }
+    stats.toMap
+  }
+
+  def validateColumnStats(properties: DescriptorProperties, key: String): Unit = {
+
+    // filter for number of columns
+    val columnCount = properties.getIndexedProperty(key, NAME).size
+
+    for (i <- 0 until columnCount) {
+      properties.validateString(s"$key.$i.$NAME", isOptional = false, minLen = 1)
+      properties.validateLong(s"$key.$i.$DISTINCT_COUNT", isOptional = true, min = 0L)
+      properties.validateLong(s"$key.$i.$NULL_COUNT", isOptional = true, min = 0L)
+      properties.validateDouble(s"$key.$i.$AVG_LENGTH", isOptional = true, min = 0.0)
+      properties.validateInt(s"$key.$i.$MAX_LENGTH", isOptional = true, min = 0)
+      properties.validateDouble(s"$key.$i.$MAX_VALUE", isOptional = true, min = 0.0)
+      properties.validateDouble(s"$key.$i.$MIN_VALUE", isOptional = true, min = 0.0)
+    }
+  }
+
+  def readColumnStats(properties: DescriptorProperties, key: String): Map[String, ColumnStats] = {
+
+    // filter for number of columns
+    val columnCount = properties.getIndexedProperty(key, NAME).size
+
+    val stats = for (i <- 0 until columnCount) yield {
+      val name = properties.getString(s"$key.$i.$NAME").getOrElse(
+        throw new ValidationException(s"Could not find name of property '$key.$i.$NAME'."))
+
+      val stats = ColumnStats(
+        properties.getLong(s"$key.$i.$DISTINCT_COUNT").map(v => Long.box(v)).orNull,
+        properties.getLong(s"$key.$i.$NULL_COUNT").map(v => Long.box(v)).orNull,
+        properties.getDouble(s"$key.$i.$AVG_LENGTH").map(v => Double.box(v)).orNull,
+        properties.getInt(s"$key.$i.$MAX_LENGTH").map(v => Int.box(v)).orNull,
+        properties.getDouble(s"$key.$i.$MAX_VALUE").map(v => Double.box(v)).orNull,
+        properties.getDouble(s"$key.$i.$MIN_VALUE").map(v => Double.box(v)).orNull
+      )
+
+      name -> stats
+    }
+
+    stats.toMap
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala
new file mode 100644
index 0000000..ae88236
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{StreamTableEnvironment, Table, TableException}
+import org.apache.flink.table.sources.{StreamTableSource, TableSource, TableSourceFactoryService}
+
+/**
+  * Descriptor for specifying a table source in a streaming environment.
+  */
+class StreamTableSourceDescriptor(tableEnv: StreamTableEnvironment, connector: ConnectorDescriptor)
+  extends TableSourceDescriptor(connector) {
+
+  /**
+    * Searches for the specified table source, configures it accordingly, and returns it.
+    */
+  def toTableSource: TableSource[_] = {
+    val source = TableSourceFactoryService.findTableSourceFactory(this)
+    source match {
+      case _: StreamTableSource[_] => source
+      case _ => throw new TableException(
+        s"Found table source '${source.getClass.getCanonicalName}' is not applicable " +
+          s"in a streaming environment.")
+    }
+  }
+
+  /**
+    * Searches for the specified table source, configures it accordingly, and returns it as a table.
+    */
+  def toTable: Table = {
+    tableEnv.fromTableSource(toTableSource)
+  }
+
+  /**
+    * Searches for the specified table source, configures it accordingly, and registers it as
+    * a table under the given name.
+    *
+    * @param name table name to be registered in the table environment
+    */
+  def register(name: String): Unit = {
+    tableEnv.registerTableSource(name, toTableSource)
+  }
+
+  /**
+    * Specifies the format that defines how to read data from a connector.
+    */
+  def withFormat(format: FormatDescriptor): StreamTableSourceDescriptor = {
+    formatDescriptor = Some(format)
+    this
+  }
+
+  /**
+    * Specifies the resulting table schema.
+    */
+  def withSchema(schema: Schema): StreamTableSourceDescriptor = {
+    schemaDescriptor = Some(schema)
+    this
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala
new file mode 100644
index 0000000..918b618
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats}
+import org.apache.flink.table.plan.stats.TableStats
+
+import scala.collection.JavaConverters._
+
+/**
+  * Common class for all descriptors describing a table source.
+  */
+abstract class TableSourceDescriptor(connector: ConnectorDescriptor) extends Descriptor {
+
+  protected val connectorDescriptor: ConnectorDescriptor = connector
+
+  protected var formatDescriptor: Option[FormatDescriptor] = None
+  protected var schemaDescriptor: Option[Schema] = None
+  protected var statisticsDescriptor: Option[Statistics] = None
+  protected var metaDescriptor: Option[Metadata] = None
+
+  /**
+    * Internal method for properties conversion.
+    */
+  override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
+    connectorDescriptor.addProperties(properties)
+
+    // check for a format
+    if (connectorDescriptor.needsFormat() && formatDescriptor.isEmpty) {
+      throw new ValidationException(
+        s"The connector '$connectorDescriptor' requires a format description.")
+    } else if (!connectorDescriptor.needsFormat() && formatDescriptor.isDefined) {
+      throw new ValidationException(
+        s"The connector '$connectorDescriptor' does not require a format description " +
+          s"but '${formatDescriptor.get}' found.")
+    }
+
+    formatDescriptor.foreach(_.addProperties(properties))
+    schemaDescriptor.foreach(_.addProperties(properties))
+    metaDescriptor.foreach(_.addProperties(properties))
+  }
+
+  /**
+    * Reads table statistics from the descriptors properties.
+    */
+  protected def getTableStats: Option[TableStats] = {
+      val normalizedProps = new DescriptorProperties()
+      addProperties(normalizedProps)
+      val rowCount = normalizedProps.getLong(STATISTICS_ROW_COUNT).map(v => Long.box(v))
+      rowCount match {
+        case Some(cnt) =>
+          val columnStats = readColumnStats(normalizedProps, STATISTICS_COLUMNS)
+          Some(TableStats(cnt, columnStats.asJava))
+        case None =>
+          None
+      }
+    }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala
index d5a5f36..da08916 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala
@@ -40,7 +40,7 @@ class FlinkStatistic(tableStats: Option[TableStats]) extends Statistic {
     *
     * @return The table statistics
     */
-  def getTableStats: TableStats = tableStats.getOrElse(null)
+  def getTableStats: TableStats = tableStats.orNull
 
   /**
     * Returns the stats of the specified the column.

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
index ba076b4..e0022c5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
@@ -270,7 +270,7 @@ object CsvTableSource {
     /**
       * Adds a field with the field name and the type information. Required.
       * This method can be called multiple times. The call order of this method defines
-      * also the order of thee fields in a row.
+      * also the order of the fields in a row.
       *
       * @param fieldName the field name
       * @param fieldType the type information of the field

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceConverter.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceConverter.scala
index 68d2f55..ab984a6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceConverter.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceConverter.scala
@@ -29,7 +29,11 @@ import com.google.common.collect.ImmutableSet
 /**
   * The class defines a converter used to convert [[CsvTableSource]] to
   * or from [[ExternalCatalogTable]].
+  *
+  * @deprecated Use the more generic [[org.apache.flink.table.sources.TableSourceFactory]] instead.
   */
+@Deprecated
+@deprecated("Use the more generic table source factories instead.")
 @TableType(value = "csv")
 class CsvTableSourceConverter extends TableSourceConverter[CsvTableSource] {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala
new file mode 100644
index 0000000..bec4565
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.util
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_TYPE, CONNECTOR_VERSION}
+import org.apache.flink.table.descriptors.CsvValidator._
+import org.apache.flink.table.descriptors.FileSystemValidator.{CONNECTOR_PATH, CONNECTOR_TYPE_VALUE}
+import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_TYPE, FORMAT_VERSION}
+import org.apache.flink.table.descriptors.SchemaValidator.{SCHEMA, SCHEMA_VERSION}
+import org.apache.flink.table.descriptors._
+import org.apache.flink.types.Row
+
+/**
+  * Factory for creating configured instances of [[CsvTableSource]].
+  */
+class CsvTableSourceFactory extends TableSourceFactory[Row] {
+
+  override def requiredContext(): util.Map[String, String] = {
+    val context = new util.HashMap[String, String]()
+    context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE)
+    context.put(FORMAT_TYPE, FORMAT_TYPE_VALUE)
+    context.put(CONNECTOR_VERSION, "1")
+    context.put(FORMAT_VERSION, "1")
+    context.put(SCHEMA_VERSION, "1")
+    context
+  }
+
+  override def supportedProperties(): util.List[String] = {
+    val properties = new util.ArrayList[String]()
+    // connector
+    properties.add(CONNECTOR_PATH)
+    // format
+    properties.add(s"$FORMAT_FIELDS.#.${DescriptorProperties.TYPE}")
+    properties.add(s"$FORMAT_FIELDS.#.${DescriptorProperties.NAME}")
+    properties.add(FORMAT_FIELD_DELIMITER)
+    properties.add(FORMAT_LINE_DELIMITER)
+    properties.add(FORMAT_QUOTE_CHARACTER)
+    properties.add(FORMAT_COMMENT_PREFIX)
+    properties.add(FORMAT_IGNORE_FIRST_LINE)
+    properties.add(FORMAT_IGNORE_PARSE_ERRORS)
+    properties.add(CONNECTOR_PATH)
+    // schema
+    properties.add(s"$SCHEMA.#.${DescriptorProperties.TYPE}")
+    properties.add(s"$SCHEMA.#.${DescriptorProperties.NAME}")
+    properties
+  }
+
+  override def create(properties: util.Map[String, String]): TableSource[Row] = {
+    val params = new DescriptorProperties()
+    params.putProperties(properties)
+
+    // validate
+    new FileSystemValidator().validate(params)
+    new CsvValidator().validate(params)
+    new SchemaValidator().validate(params)
+
+    // build
+    val csvTableSourceBuilder = new CsvTableSource.Builder
+
+    val tableSchema = params.getTableSchema(SCHEMA).get
+    val encodingSchema = params.getTableSchema(FORMAT_FIELDS)
+
+    // the CsvTableSource needs some rework first
+    // for now the schema must be equal to the encoding
+    if (!encodingSchema.contains(tableSchema)) {
+      throw new TableException(
+        "Encodings that differ from the schema are not supported yet for CsvTableSources.")
+    }
+
+    params.getString(CONNECTOR_PATH).foreach(csvTableSourceBuilder.path)
+    params.getString(FORMAT_FIELD_DELIMITER).foreach(csvTableSourceBuilder.fieldDelimiter)
+    params.getString(FORMAT_LINE_DELIMITER).foreach(csvTableSourceBuilder.lineDelimiter)
+
+    encodingSchema.foreach { schema =>
+      schema.getColumnNames.zip(schema.getTypes).foreach { case (name, tpe) =>
+        csvTableSourceBuilder.field(name, tpe)
+      }
+    }
+    params.getCharacter(FORMAT_QUOTE_CHARACTER).foreach(csvTableSourceBuilder.quoteCharacter)
+    params.getString(FORMAT_COMMENT_PREFIX).foreach(csvTableSourceBuilder.commentPrefix)
+    params.getBoolean(FORMAT_IGNORE_FIRST_LINE).foreach { flag =>
+      if (flag) {
+        csvTableSourceBuilder.ignoreFirstLine()
+      }
+    }
+    params.getBoolean(FORMAT_IGNORE_PARSE_ERRORS).foreach { flag =>
+      if (flag) {
+        csvTableSourceBuilder.ignoreParseErrors()
+      }
+    }
+
+    csvTableSourceBuilder.build()
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala
new file mode 100644
index 0000000..f42d765
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.util
+
+/**
+  * A factory to create a [[TableSource]]. This factory is used with Java's Service Provider
+  * Interfaces (SPI) for discovering. A factory is called with a set of normalized properties that
+  * describe the desired table source. The factory allows for matching to the given set of
+  * properties and creating a configured [[TableSource]] accordingly.
+  *
+  * Classes that implement this interface need to be added to the
+  * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' file of a JAR file in
+  * the current classpath to be found.
+  */
+trait TableSourceFactory[T] {
+
+  /**
+    * Specifies the context that this factory has been implemented for. The framework guarantees
+    * to only call the [[create()]] method of the factory if the specified set of properties and
+    * values are met.
+    *
+    * Typical properties might be:
+    *   - connector.type
+    *   - format.type
+    *
+    * Specified versions allow the framework to provide backwards compatible properties in case of
+    * string format changes:
+    *   - connector.version
+    *   - format.version
+    *
+    * An empty context means that the factory matches for all requests.
+    */
+  def requiredContext(): util.Map[String, String]
+
+  /**
+    * List of property keys that this factory can handle. This method will be used for validation.
+    * If a property is passed that this factory cannot handle, an exception will be thrown. The
+    * list must not contain the keys that are specified by the context.
+    *
+    * Example properties might be:
+    *   - format.line-delimiter
+    *   - format.ignore-parse-errors
+    *   - format.fields.#.type
+    *   - format.fields.#.name
+    *
+    * Note: Use "#" to denote an array of values where "#" represents one or more digits.
+    */
+  def supportedProperties(): util.List[String]
+
+  /**
+    * Creates and configures a [[TableSource]] using the given properties.
+    *
+    * @param properties normalized properties describing a table source
+    * @return the configured table source
+    */
+  def create(properties: util.Map[String, String]): TableSource[T]
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactoryService.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactoryService.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactoryService.scala
new file mode 100644
index 0000000..cb737a9
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactoryService.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.util.{ServiceConfigurationError, ServiceLoader}
+
+import org.apache.flink.table.api.{AmbiguousTableSourceException, NoMatchingTableSourceException, TableException, ValidationException}
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION
+import org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_VERSION
+import org.apache.flink.table.descriptors.MetadataValidator.METADATA_VERSION
+import org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_VERSION
+import org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_VERSION
+import org.apache.flink.table.descriptors.StatisticsValidator.STATISTICS_VERSION
+import org.apache.flink.table.descriptors._
+import org.apache.flink.table.util.Logging
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/**
+  * Service provider interface for finding suitable table source factories for the given properties.
+  */
+object TableSourceFactoryService extends Logging {
+
+  private lazy val loader = ServiceLoader.load(classOf[TableSourceFactory[_]])
+
+  def findTableSourceFactory(descriptor: TableSourceDescriptor): TableSource[_] = {
+    val properties = new DescriptorProperties()
+    descriptor.addProperties(properties)
+    findTableSourceFactory(properties.asMap)
+  }
+
+  def findTableSourceFactory(properties: Map[String, String]): TableSource[_] = {
+    var matchingFactory: Option[(TableSourceFactory[_], Seq[String])] = None
+    try {
+      val iter = loader.iterator()
+      while (iter.hasNext) {
+        val factory = iter.next()
+
+        val requiredContextJava = try {
+          factory.requiredContext()
+        } catch {
+          case t: Throwable =>
+            throw new TableException(
+              s"Table source factory '${factory.getClass.getCanonicalName}' caused an exception.",
+              t)
+        }
+
+        val requiredContext = if (requiredContextJava != null) {
+          // normalize properties
+          requiredContextJava.asScala.map(e => (e._1.toLowerCase, e._2))
+        } else {
+          Map[String, String]()
+        }
+
+        val plainContext = mutable.Map[String, String]()
+        plainContext ++= requiredContext
+        // we remove the versions for now until we have the first backwards compatibility case
+        // with the version we can provide mappings in case the format changes
+        plainContext.remove(CONNECTOR_VERSION)
+        plainContext.remove(FORMAT_VERSION)
+        plainContext.remove(SCHEMA_VERSION)
+        plainContext.remove(ROWTIME_VERSION)
+        plainContext.remove(METADATA_VERSION)
+        plainContext.remove(STATISTICS_VERSION)
+
+        // check if required context is met
+        if (plainContext.forall(e => properties.contains(e._1) && properties(e._1) == e._2)) {
+          matchingFactory match {
+            case Some(_) => throw new AmbiguousTableSourceException(properties)
+            case None => matchingFactory = Some((factory, requiredContext.keys.toSeq))
+          }
+        }
+      }
+    } catch {
+      case e: ServiceConfigurationError =>
+        LOG.error("Could not load service provider for table source factories.", e)
+        throw new TableException("Could not load service provider for table source factories.", e)
+    }
+
+    val (factory, context) = matchingFactory
+      .getOrElse(throw new NoMatchingTableSourceException(properties))
+
+    val plainProperties = mutable.ArrayBuffer[String]()
+    properties.keys.foreach { k =>
+      // replace arrays with wildcard
+      val key = k.replaceAll(".\\d+", ".#")
+      // ignore context properties and duplicates
+      if (!context.contains(key) && !plainProperties.contains(key)) {
+        plainProperties += key
+      }
+    }
+
+    val supportedPropertiesJava = try {
+      factory.supportedProperties()
+    } catch {
+      case t: Throwable =>
+        throw new TableException(
+          s"Table source factory '${factory.getClass.getCanonicalName}' caused an exception.",
+          t)
+    }
+
+    val supportedProperties = if (supportedPropertiesJava != null) {
+      supportedPropertiesJava.asScala.map(_.toLowerCase)
+    } else {
+      Seq[String]()
+    }
+
+    // check for supported properties
+    plainProperties.foreach { k =>
+      if (!supportedProperties.contains(k)) {
+        throw new ValidationException(
+          s"Table factory '${factory.getClass.getCanonicalName}' does not support the " +
+          s"property '$k'. Supported properties are: \n${supportedProperties.mkString("\n")}")
+      }
+    }
+
+    // create the table source
+    try {
+      factory.create(properties.asJava)
+    } catch {
+      case t: Throwable =>
+        throw new TableException(
+          s"Table source factory '${factory.getClass.getCanonicalName}' caused an exception.",
+          t)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/TimestampExtractor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/TimestampExtractor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/TimestampExtractor.scala
index 34c6ba5..ce57b92 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/TimestampExtractor.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/TimestampExtractor.scala
@@ -25,7 +25,7 @@ import org.apache.flink.table.sources.FieldComputer
 /**
   * Provides the an expression to extract the timestamp for a rowtime attribute.
   */
-abstract class TimestampExtractor extends FieldComputer[Long] {
+abstract class TimestampExtractor extends FieldComputer[Long] with Serializable {
 
   /** Timestamp extractors compute the timestamp as Long. */
   override def getReturnType: TypeInformation[Long] = Types.LONG.asInstanceOf[TypeInformation[Long]]

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala
index 3a947ac..9fc7c88 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala
@@ -21,12 +21,12 @@ package org.apache.flink.table.sources.wmstrategies
 import org.apache.flink.streaming.api.watermark.Watermark
 
 /**
-  * A watermark assigner for ascending rowtime attributes.
+  * A watermark strategy for ascending rowtime attributes.
   *
   * Emits a watermark of the maximum observed timestamp so far minus 1.
   * Rows that have a timestamp equal to the max timestamp are not late.
   */
-class AscendingTimestamps extends PeriodicWatermarkAssigner {
+final class AscendingTimestamps extends PeriodicWatermarkAssigner {
 
   var maxTimestamp: Long = Long.MinValue + 1
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.scala
index 957daca..8f7c235 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.scala
@@ -21,13 +21,13 @@ package org.apache.flink.table.sources.wmstrategies
 import org.apache.flink.streaming.api.watermark.Watermark
 
 /**
-  * A watermark assigner for rowtime attributes which are out-of-order by a bounded time interval.
+  * A watermark strategy for rowtime attributes which are out-of-order by a bounded time interval.
   *
   * Emits watermarks which are the maximum observed timestamp minus the specified delay.
   *
   * @param delay The delay by which watermarks are behind the maximum observed timestamp.
   */
-class BoundedOutOfOrderTimestamps(val delay: Long) extends PeriodicWatermarkAssigner {
+final class BoundedOutOfOrderTimestamps(val delay: Long) extends PeriodicWatermarkAssigner {
 
   var maxTimestamp: Long = Long.MinValue + delay
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
index 4c7f4e4..dd71bd3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
@@ -62,7 +62,7 @@ abstract class PunctuatedWatermarkAssigner extends WatermarkStrategy {
 }
 
 /** A strategy which indicates the watermarks should be preserved from the underlying datastream.*/
-class PreserveWatermarks extends WatermarkStrategy
+final class PreserveWatermarks extends WatermarkStrategy
 object PreserveWatermarks {
   val INSTANCE: PreserveWatermarks = new PreserveWatermarks
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala
new file mode 100644
index 0000000..253b491
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils
+
+import java.io.Serializable
+
+import org.apache.commons.codec.binary.Base64
+import org.apache.commons.lang3.StringEscapeUtils
+import org.apache.flink.api.common.functions.InvalidTypesException
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.typeutils._
+import org.apache.flink.table.api.{TableException, Types, ValidationException}
+import org.apache.flink.table.descriptors.DescriptorProperties.normalizeTypeInfo
+import org.apache.flink.util.InstantiationUtil
+
+import _root_.scala.language.implicitConversions
+import _root_.scala.util.parsing.combinator.{JavaTokenParsers, PackratParsers}
+
+/**
+  * Utilities to convert [[org.apache.flink.api.common.typeinfo.TypeInformation]] into a
+  * string representation and back.
+  */
+object TypeStringUtils extends JavaTokenParsers with PackratParsers {
+  case class Keyword(key: String)
+
+  // convert the keyword into an case insensitive Parser
+  implicit def keyword2Parser(kw: Keyword): Parser[String] = {
+    ("""(?i)\Q""" + kw.key + """\E""").r
+  }
+
+  lazy val VARCHAR: Keyword = Keyword("VARCHAR")
+  lazy val STRING: Keyword = Keyword("STRING")
+  lazy val BOOLEAN: Keyword = Keyword("BOOLEAN")
+  lazy val BYTE: Keyword = Keyword("BYTE")
+  lazy val TINYINT: Keyword = Keyword("TINYINT")
+  lazy val SHORT: Keyword = Keyword("SHORT")
+  lazy val SMALLINT: Keyword = Keyword("SMALLINT")
+  lazy val INT: Keyword = Keyword("INT")
+  lazy val LONG: Keyword = Keyword("LONG")
+  lazy val BIGINT: Keyword = Keyword("BIGINT")
+  lazy val FLOAT: Keyword = Keyword("FLOAT")
+  lazy val DOUBLE: Keyword = Keyword("DOUBLE")
+  lazy val DECIMAL: Keyword = Keyword("DECIMAL")
+  lazy val SQL_DATE: Keyword = Keyword("SQL_DATE")
+  lazy val DATE: Keyword = Keyword("DATE")
+  lazy val SQL_TIME: Keyword = Keyword("SQL_TIME")
+  lazy val TIME: Keyword = Keyword("TIME")
+  lazy val SQL_TIMESTAMP: Keyword = Keyword("SQL_TIMESTAMP")
+  lazy val TIMESTAMP: Keyword = Keyword("TIMESTAMP")
+  lazy val ROW: Keyword = Keyword("ROW")
+  lazy val ANY: Keyword = Keyword("ANY")
+  lazy val POJO: Keyword = Keyword("POJO")
+
+  lazy val qualifiedName: Parser[String] =
+    """\p{javaJavaIdentifierStart}[\p{javaJavaIdentifierPart}.]*""".r
+
+  lazy val base64Url: Parser[String] =
+    """[A-Za-z0-9_-]*""".r
+
+  lazy val atomic: PackratParser[TypeInformation[_]] =
+    (VARCHAR | STRING) ^^ { e => Types.STRING } |
+    BOOLEAN ^^ { e => Types.BOOLEAN } |
+    (TINYINT | BYTE) ^^ { e => Types.BYTE } |
+    (SMALLINT | SHORT) ^^ { e => Types.SHORT } |
+    INT ^^ { e => Types.INT } |
+    (BIGINT | LONG) ^^ { e => Types.LONG } |
+    FLOAT ^^ { e => Types.FLOAT } |
+    DOUBLE ^^ { e => Types.DOUBLE } |
+    DECIMAL ^^ { e => Types.DECIMAL } |
+    (DATE | SQL_DATE) ^^ { e => Types.SQL_DATE.asInstanceOf[TypeInformation[_]] } |
+    (TIMESTAMP | SQL_TIMESTAMP) ^^ { e => Types.SQL_TIMESTAMP } |
+    (TIME | SQL_TIME) ^^ { e => Types.SQL_TIME }
+
+  lazy val escapedFieldName: PackratParser[String] = "\"" ~> stringLiteral <~ "\"" ^^ { s =>
+    StringEscapeUtils.unescapeJava(s)
+  }
+
+  lazy val fieldName: PackratParser[String] = escapedFieldName | stringLiteral | ident
+
+  lazy val field: PackratParser[(String, TypeInformation[_])] =
+    fieldName ~ typeInfo ^^ {
+      case name ~ info => (name, info)
+    }
+
+  lazy val namedRow: PackratParser[TypeInformation[_]] =
+    ROW ~ "(" ~> rep1sep(field, ",") <~ ")" ^^ {
+      fields => Types.ROW(fields.map(_._1).toArray, fields.map(_._2).toArray)
+    } | failure("Named row type expected.")
+
+  lazy val unnamedRow: PackratParser[TypeInformation[_]] =
+    ROW ~ "(" ~> rep1sep(typeInfo, ",") <~ ")" ^^ {
+      types => Types.ROW(types: _*)
+    } | failure("Unnamed row type expected.")
+
+  lazy val generic: PackratParser[TypeInformation[_]] =
+    ANY ~ "(" ~> qualifiedName <~ ")" ^^ {
+      typeClass =>
+        val clazz = loadClass(typeClass)
+        new GenericTypeInfo[AnyRef](clazz.asInstanceOf[Class[AnyRef]])
+    }
+
+  lazy val pojo: PackratParser[TypeInformation[_]] = POJO ~ "(" ~> qualifiedName <~ ")" ^^ {
+    typeClass =>
+      val clazz = loadClass(typeClass)
+      val info = TypeExtractor.createTypeInfo(clazz)
+      if (!info.isInstanceOf[PojoTypeInfo[_]]) {
+        throw new ValidationException(s"Class '$typeClass'is not a POJO type.")
+      }
+      info
+  }
+
+  lazy val any: PackratParser[TypeInformation[_]] =
+    ANY ~ "(" ~ qualifiedName ~ "," ~ base64Url ~ ")" ^^ {
+      case _ ~ _ ~ typeClass ~ _ ~ serialized ~ _=>
+        val clazz = loadClass(typeClass)
+        val typeInfo = deserialize(serialized)
+
+        if (clazz != typeInfo.getTypeClass) {
+          throw new ValidationException(
+            s"Class '$typeClass' does no correspond to serialized data.")
+        }
+        typeInfo
+    }
+
+  lazy val typeInfo: PackratParser[TypeInformation[_]] =
+    namedRow | unnamedRow | any | generic | pojo | atomic | failure("Invalid type.")
+
+  def readTypeInfo(typeString: String): TypeInformation[_] = {
+    parseAll(typeInfo, typeString) match {
+      case Success(lst, _) => lst
+
+      case NoSuccess(msg, next) =>
+        throwError(msg, next)
+    }
+  }
+
+  // ----------------------------------------------------------------------------------------------
+
+  def writeTypeInfo(typeInfo: TypeInformation[_]): String = typeInfo match {
+
+    case Types.STRING => VARCHAR.key
+    case Types.BOOLEAN => BOOLEAN.key
+    case Types.BYTE => TINYINT.key
+    case Types.SHORT => SMALLINT.key
+    case Types.INT => INT.key
+    case Types.LONG => BIGINT.key
+    case Types.FLOAT => FLOAT.key
+    case Types.DOUBLE => DOUBLE.key
+    case Types.DECIMAL => DECIMAL.key
+    case Types.SQL_DATE => DATE.key
+    case Types.SQL_TIME => TIME.key
+    case Types.SQL_TIMESTAMP => TIMESTAMP.key
+
+    case rt: RowTypeInfo =>
+      val fields = rt.getFieldNames.zip(rt.getFieldTypes)
+      val normalizedFields = fields.map { f =>
+
+        // escape field name if it contains spaces
+        val name = if (!f._1.matches("\\S+")) {
+          "\"" + StringEscapeUtils.escapeJava(f._1) + "\""
+        } else {
+          f._1
+        }
+
+        s"$name ${normalizeTypeInfo(f._2)}"
+      }
+      s"${ROW.key}(${normalizedFields.mkString(", ")})"
+
+    case generic: GenericTypeInfo[_] =>
+      s"${ANY.key}(${generic.getTypeClass.getName})"
+
+    case pojo: PojoTypeInfo[_] =>
+      // we only support very simple POJOs that only contain extracted fields
+      // (not manually specified)
+      val extractedInfo = try {
+        Some(TypeExtractor.createTypeInfo(pojo.getTypeClass))
+      } catch {
+        case _: InvalidTypesException => None
+      }
+      extractedInfo match {
+        case Some(ei) if ei == pojo => s"${POJO.key}(${pojo.getTypeClass.getName})"
+        case _ =>
+          throw new TableException(
+            "A string representation for custom POJO types is not supported yet.")
+      }
+
+    case _: CompositeType[_] =>
+      throw new TableException("A string representation for composite types is not supported yet.")
+
+    case _: BasicArrayTypeInfo[_, _] | _: ObjectArrayTypeInfo[_, _] |
+         _: PrimitiveArrayTypeInfo[_] =>
+      throw new TableException("A string representation for array types is not supported yet.")
+
+    case _: MapTypeInfo[_, _] | _: MultisetTypeInfo[_] =>
+      throw new TableException("A string representation for map types is not supported yet.")
+
+    case any: TypeInformation[_] =>
+      s"${ANY.key}(${any.getTypeClass.getName}, ${serialize(any)})"
+  }
+
+  // ----------------------------------------------------------------------------------------------
+
+  private def serialize(obj: Serializable): String = {
+    try {
+      val byteArray = InstantiationUtil.serializeObject(obj)
+      Base64.encodeBase64URLSafeString(byteArray)
+    } catch {
+      case e: Exception =>
+        throw new ValidationException(s"Unable to serialize type information '$obj' with " +
+          s"class '${obj.getClass.getName}'.", e)
+    }
+  }
+
+  private def deserialize(data: String): TypeInformation[_] = {
+    val byteData = Base64.decodeBase64(data)
+    InstantiationUtil
+      .deserializeObject[TypeInformation[_]](byteData, Thread.currentThread.getContextClassLoader)
+  }
+
+  private def throwError(msg: String, next: Input): Nothing = {
+    val improvedMsg = msg.replace("string matching regex `\\z'", "End of type information")
+
+    throw new ValidationException(
+      s"""Could not parse type information at column ${next.pos.column}: $improvedMsg
+        |${next.pos.longString}""".stripMargin)
+  }
+
+  private def loadClass(qualifiedName: String): Class[_] = try {
+    Class.forName(qualifiedName, true, Thread.currentThread.getContextClassLoader)
+  } catch {
+    case e: ClassNotFoundException =>
+      throw new ValidationException("Class '" + qualifiedName + "' could not be found. " +
+        "Please note that inner classes must be globally accessible and declared static.", e)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory b/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
new file mode 100644
index 0000000..1b2506e
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.sources.TestTableSourceFactory


[3/3] flink git commit: [FLINK-8240] [table] Create unified interfaces to configure and instatiate TableSources

Posted by tw...@apache.org.
[FLINK-8240] [table] Create unified interfaces to configure and instatiate TableSources

This closes #5240.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2cb58960
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2cb58960
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2cb58960

Branch: refs/heads/master
Commit: 2cb58960e78bee83d29fa66c2c29647353194f75
Parents: a5476cd
Author: twalthr <tw...@apache.org>
Authored: Fri Dec 15 10:18:20 2017 +0100
Committer: twalthr <tw...@apache.org>
Committed: Wed Jan 31 12:05:43 2018 +0100

----------------------------------------------------------------------
 .../connectors/kafka/KafkaJsonTableSource.java  |   3 +-
 .../flink/api/java/typeutils/PojoTypeInfo.java  |   1 -
 .../flink/table/annotation/TableType.java       |   6 +-
 ...pache.flink.table.sources.TableSourceFactory |  16 +
 .../flink/table/api/BatchTableEnvironment.scala |  24 +-
 .../table/api/StreamTableEnvironment.scala      |  24 +-
 .../flink/table/api/TableEnvironment.scala      |  30 +-
 .../org/apache/flink/table/api/exceptions.scala |  49 ++
 .../table/api/java/BatchTableEnvironment.scala  |   2 +-
 .../table/catalog/ExternalCatalogSchema.scala   |  12 +-
 .../table/catalog/ExternalCatalogTable.scala    | 290 ++++++++++-
 .../table/catalog/ExternalTableSourceUtil.scala | 112 +++--
 .../table/catalog/TableSourceConverter.scala    |   4 +
 .../BatchTableSourceDescriptor.scala            |  72 +++
 .../table/descriptors/ConnectorDescriptor.scala |  54 ++
 .../ConnectorDescriptorValidator.scala          |  39 ++
 .../apache/flink/table/descriptors/Csv.scala    | 166 +++++++
 .../flink/table/descriptors/CsvValidator.scala  |  53 ++
 .../flink/table/descriptors/Descriptor.scala    |  32 ++
 .../descriptors/DescriptorProperties.scala      | 489 +++++++++++++++++++
 .../table/descriptors/DescriptorValidator.scala |  32 ++
 .../flink/table/descriptors/FileSystem.scala    |  60 +++
 .../table/descriptors/FileSystemValidator.scala |  41 ++
 .../table/descriptors/FormatDescriptor.scala    |  49 ++
 .../descriptors/FormatDescriptorValidator.scala |  39 ++
 .../apache/flink/table/descriptors/Json.scala   |  78 +++
 .../flink/table/descriptors/JsonValidator.scala |  41 ++
 .../flink/table/descriptors/Metadata.scala      |  81 +++
 .../table/descriptors/MetadataValidator.scala   |  43 ++
 .../flink/table/descriptors/Rowtime.scala       | 133 +++++
 .../table/descriptors/RowtimeValidator.scala    | 134 +++++
 .../apache/flink/table/descriptors/Schema.scala | 164 +++++++
 .../table/descriptors/SchemaValidator.scala     |  80 +++
 .../flink/table/descriptors/Statistics.scala    | 157 ++++++
 .../table/descriptors/StatisticsValidator.scala | 119 +++++
 .../StreamTableSourceDescriptor.scala           |  75 +++
 .../descriptors/TableSourceDescriptor.scala     |  75 +++
 .../flink/table/plan/stats/FlinkStatistic.scala |   2 +-
 .../flink/table/sources/CsvTableSource.scala    |   2 +-
 .../table/sources/CsvTableSourceConverter.scala |   4 +
 .../table/sources/CsvTableSourceFactory.scala   | 113 +++++
 .../table/sources/TableSourceFactory.scala      |  76 +++
 .../sources/TableSourceFactoryService.scala     | 144 ++++++
 .../tsextractors/TimestampExtractor.scala       |   2 +-
 .../wmstrategies/AscendingTimestamps.scala      |   4 +-
 .../BoundedOutOfOrderTimestamps.scala           |   4 +-
 .../wmstrategies/watermarkStrategies.scala      |   2 +-
 .../flink/table/typeutils/TypeStringUtils.scala | 252 ++++++++++
 ...pache.flink.table.sources.TableSourceFactory |  16 +
 .../flink/table/api/TableSourceTest.scala       |  38 ++
 .../catalog/ExternalCatalogSchemaTest.scala     |   4 +-
 .../catalog/ExternalTableSourceUtilTest.scala   |   6 +-
 .../flink/table/descriptors/CsvTest.scala       | 102 ++++
 .../table/descriptors/DescriptorTestBase.scala  |  54 ++
 .../table/descriptors/FileSystemTest.scala      |  53 ++
 .../flink/table/descriptors/JsonTest.scala      |  77 +++
 .../flink/table/descriptors/MetadataTest.scala  |  55 +++
 .../flink/table/descriptors/RowtimeTest.scala   |  72 +++
 .../flink/table/descriptors/SchemaTest.scala    |  76 +++
 .../table/descriptors/StatisticsTest.scala      |  91 ++++
 .../StreamTableSourceDescriptorTest.scala       |  79 +++
 .../sources/TableSourceFactoryServiceTest.scala |  82 ++++
 .../table/sources/TestTableSourceFactory.scala  |  60 +++
 .../table/typeutils/TypeStringUtilsTest.scala   | 104 ++++
 .../table/utils/MockTableEnvironment.scala      |   7 +-
 65 files changed, 4383 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
index 7de7f34..b3b545e 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
@@ -161,7 +161,8 @@ public abstract class KafkaJsonTableSource extends KafkaTableSource implements D
 		/**
 		 * Sets flag whether to fail if a field is missing or not.
 		 *
-		 * @param failOnMissingField If set to true, the TableSource fails if a missing fields.
+		 * @param failOnMissingField If set to true, the TableSource fails if there is a missing
+		 *                           field.
 		 *                           If set to false, a missing field is set to null.
 		 * @return The builder.
 		 */

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
index 211b7ef..7f41f5d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
@@ -268,7 +268,6 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
 		return new PojoTypeComparatorBuilder();
 	}
 
-	// used for testing. Maybe use mockito here
 	@PublicEvolving
 	public PojoField getPojoFieldAt(int pos) {
 		if (pos < 0 || pos >= this.fields.length) {

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/TableType.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/TableType.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/TableType.java
index 2d2a7af..c564528 100644
--- a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/TableType.java
+++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/TableType.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.annotation;
 
-import org.apache.flink.annotation.Public;
 import org.apache.flink.table.catalog.TableSourceConverter;
 
 import java.lang.annotation.Documented;
@@ -29,11 +28,14 @@ import java.lang.annotation.Target;
 
 /**
  * Annotates a table type of a {@link TableSourceConverter}.
+ *
+ * @deprecated Use the more generic [[org.apache.flink.table.sources.TableSourceFactory]] interface
+ * with Java service loaders instead.
  */
 @Documented
 @Target(ElementType.TYPE)
 @Retention(RetentionPolicy.RUNTIME)
-@Public
+@Deprecated
 public @interface TableType {
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory b/flink-libraries/flink-table/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
new file mode 100644
index 0000000..ff43eed
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.sources.CsvTableSourceFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index c920d23..05255fd 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -86,17 +86,20 @@ abstract class BatchTableEnvironment(
   }
 
   /** Returns a unique table name according to the internal naming pattern. */
-  protected def createUniqueTableName(): String = "_DataSetTable_" + nameCntr.getAndIncrement()
+  override protected def createUniqueTableName(): String =
+    "_DataSetTable_" + nameCntr.getAndIncrement()
 
   /**
-    * Registers an external [[BatchTableSource]] in this [[TableEnvironment]]'s catalog.
-    * Registered tables can be referenced in SQL queries.
+    * Registers an internal [[BatchTableSource]] in this [[TableEnvironment]]'s catalog without
+    * name checking. Registered tables can be referenced in SQL queries.
     *
     * @param name        The name under which the [[TableSource]] is registered.
     * @param tableSource The [[TableSource]] to register.
     */
-  override def registerTableSource(name: String, tableSource: TableSource[_]): Unit = {
-    checkValidTableName(name)
+  override protected def registerTableSourceInternal(
+      name: String,
+      tableSource: TableSource[_])
+    : Unit = {
 
     tableSource match {
       case batchTableSource: BatchTableSource[_] =>
@@ -107,6 +110,17 @@ abstract class BatchTableEnvironment(
     }
   }
 
+// TODO expose this once we have enough table source factories that can deal with it
+//  /**
+//    * Creates a table from a descriptor that describes the source connector, source encoding,
+//    * the resulting table schema, and other properties.
+//    *
+//    * @param connectorDescriptor connector descriptor describing the source of the table
+//    */
+//  def from(connectorDescriptor: ConnectorDescriptor): BatchTableSourceDescriptor = {
+//    new BatchTableSourceDescriptor(this, connectorDescriptor)
+//  }
+
   /**
     * Registers an external [[TableSink]] with given field names and types in this
     * [[TableEnvironment]]'s catalog.

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 9d94f54..84d7240 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -97,17 +97,20 @@ abstract class StreamTableEnvironment(
   }
 
   /** Returns a unique table name according to the internal naming pattern. */
-  protected def createUniqueTableName(): String = "_DataStreamTable_" + nameCntr.getAndIncrement()
+  override protected def createUniqueTableName(): String =
+    "_DataStreamTable_" + nameCntr.getAndIncrement()
 
   /**
-    * Registers an external [[StreamTableSource]] in this [[TableEnvironment]]'s catalog.
-    * Registered tables can be referenced in SQL queries.
+    * Registers an internal [[StreamTableSource]] in this [[TableEnvironment]]'s catalog without
+    * name checking. Registered tables can be referenced in SQL queries.
     *
     * @param name        The name under which the [[TableSource]] is registered.
     * @param tableSource The [[TableSource]] to register.
     */
-  override def registerTableSource(name: String, tableSource: TableSource[_]): Unit = {
-    checkValidTableName(name)
+  override protected def registerTableSourceInternal(
+      name: String,
+      tableSource: TableSource[_])
+    : Unit = {
 
     tableSource match {
       case streamTableSource: StreamTableSource[_] =>
@@ -125,6 +128,17 @@ abstract class StreamTableEnvironment(
     }
   }
 
+// TODO expose this once we have enough table source factories that can deal with it
+//  /**
+//    * Creates a table from a descriptor that describes the source connector, source encoding,
+//    * the resulting table schema, and other properties.
+//    *
+//    * @param connectorDescriptor connector descriptor describing the source of the table
+//    */
+//  def from(connectorDescriptor: ConnectorDescriptor): StreamTableSourceDescriptor = {
+//    new StreamTableSourceDescriptor(this, connectorDescriptor)
+//  }
+
   /**
     * Registers an external [[TableSink]] with given field names and types in this
     * [[TableEnvironment]]'s catalog.

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 2569cd4..3b314e9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -290,6 +290,17 @@ abstract class TableEnvironment(val config: TableConfig) {
   }
 
   /**
+    * Creates a table from a table source.
+    *
+    * @param source table source used as table
+    */
+  def fromTableSource(source: TableSource[_]): Table = {
+    val name = createUniqueTableName()
+    registerTableSourceInternal(name, source)
+    scan(name)
+  }
+
+  /**
     * Registers an [[ExternalCatalog]] under a unique name in the TableEnvironment's schema.
     * All tables registered in the [[ExternalCatalog]] can be accessed.
     *
@@ -302,7 +313,7 @@ abstract class TableEnvironment(val config: TableConfig) {
     }
     this.externalCatalogs.put(name, externalCatalog)
     // create an external catalog Calcite schema, register it on the root schema
-    ExternalCatalogSchema.registerCatalog(rootSchema, name, externalCatalog)
+    ExternalCatalogSchema.registerCatalog(this, rootSchema, name, externalCatalog)
   }
 
   /**
@@ -422,7 +433,19 @@ abstract class TableEnvironment(val config: TableConfig) {
     * @param name        The name under which the [[TableSource]] is registered.
     * @param tableSource The [[TableSource]] to register.
     */
-  def registerTableSource(name: String, tableSource: TableSource[_]): Unit
+  def registerTableSource(name: String, tableSource: TableSource[_]): Unit = {
+    checkValidTableName(name)
+    registerTableSourceInternal(name, tableSource)
+  }
+
+  /**
+    * Registers an internal [[TableSource]] in this [[TableEnvironment]]'s catalog without
+    * name checking. Registered tables can be referenced in SQL queries.
+    *
+    * @param name        The name under which the [[TableSource]] is registered.
+    * @param tableSource The [[TableSource]] to register.
+    */
+  protected def registerTableSourceInternal(name: String, tableSource: TableSource[_]): Unit
 
   /**
     * Registers an external [[TableSink]] with given field names and types in this
@@ -714,6 +737,9 @@ abstract class TableEnvironment(val config: TableConfig) {
     }
   }
 
+  /** Returns a unique table name according to the internal naming pattern. */
+  protected def createUniqueTableName(): String
+
   /**
     * Checks if the chosen table name is valid.
     *

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
index 7ea17fa..f5a713a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
@@ -39,6 +39,9 @@ case class SqlParserException(
 
 /**
   * General Exception for all errors during table handling.
+  *
+  * This exception indicates that an internal error occurred or that a feature is not supported
+  * yet. Usually, this exception does not indicate a fault of the user.
   */
 case class TableException(
     msg: String,
@@ -55,6 +58,8 @@ object TableException {
 
 /**
   * Exception for all errors occurring during validation phase.
+  *
+  * This exception indicates that the user did something wrong.
   */
 case class ValidationException(
     msg: String,
@@ -137,11 +142,51 @@ case class CatalogAlreadyExistException(
 }
 
 /**
+  * Exception for not finding a [[org.apache.flink.table.sources.TableSourceFactory]] for the
+  * given properties.
+  *
+  * @param properties properties that describe the table source
+  * @param cause the cause
+  */
+case class NoMatchingTableSourceException(
+    properties: Map[String, String],
+    cause: Throwable)
+    extends RuntimeException(
+      s"Could not find a table source factory in the classpath satisfying the " +
+        s"following properties: \n${properties.map(e => e._1 + "=" +  e._2 ).mkString("\n")}",
+      cause) {
+
+  def this(properties: Map[String, String]) = this(properties, null)
+}
+
+/**
+  * Exception for finding more than one [[org.apache.flink.table.sources.TableSourceFactory]] for
+  * the given properties.
+  *
+  * @param properties properties that describe the table source
+  * @param cause the cause
+  */
+case class AmbiguousTableSourceException(
+    properties: Map[String, String],
+    cause: Throwable)
+    extends RuntimeException(
+      s"More than one table source factory in the classpath satisfying the " +
+        s"following properties: \n${properties.map(e => e._1 + "=" +  e._2 ).mkString("\n")}",
+      cause) {
+
+  def this(properties: Map[String, String]) = this(properties, null)
+}
+
+/**
   * Exception for not finding a [[TableSourceConverter]] for a given table type.
   *
   * @param tableType table type
   * @param cause the cause
+  * @deprecated Use table source factories instead
+  *            (see [[org.apache.flink.table.sources.TableSourceFactory]]).
   */
+@Deprecated
+@deprecated("Use table factories (see TableSourceFactory) instead.")
 case class NoMatchedTableSourceConverterException(
     tableType: String,
     cause: Throwable)
@@ -156,7 +201,11 @@ case class NoMatchedTableSourceConverterException(
   *
   * @param tableType table type
   * @param cause the cause
+  * @deprecated Use table source factories instead
+  *            (see [[org.apache.flink.table.sources.TableSourceFactory]]).
   */
+@Deprecated
+@deprecated("Use table factories (see TableSourceFactory) instead.")
 case class AmbiguousTableSourceConverterException(
     tableType: String,
     cause: Throwable)

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
index b79d161..f8f35eb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
@@ -20,8 +20,8 @@ package org.apache.flink.table.api.java
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
-import org.apache.flink.table.expressions.ExpressionParser
 import org.apache.flink.table.api._
+import org.apache.flink.table.expressions.ExpressionParser
 import org.apache.flink.table.functions.{AggregateFunction, TableFunction}
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
index d87d665..776ddee 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
@@ -22,7 +22,7 @@ import java.util.{Collection => JCollection, Collections => JCollections, Linked
 
 import org.apache.calcite.linq4j.tree.Expression
 import org.apache.calcite.schema._
-import org.apache.flink.table.api.{CatalogNotExistException, TableNotExistException}
+import org.apache.flink.table.api.{CatalogNotExistException, TableEnvironment, TableNotExistException}
 import org.apache.flink.table.util.Logging
 
 import scala.collection.JavaConverters._
@@ -33,10 +33,12 @@ import scala.collection.JavaConverters._
   * The external catalog and all included sub-catalogs and tables is registered as
   * sub-schemas and tables in Calcite.
   *
+  * @param tableEnv the environment for this schema
   * @param catalogIdentifier external catalog name
   * @param catalog           external catalog
   */
 class ExternalCatalogSchema(
+    tableEnv: TableEnvironment,
     catalogIdentifier: String,
     catalog: ExternalCatalog) extends Schema with Logging {
 
@@ -50,7 +52,7 @@ class ExternalCatalogSchema(
   override def getSubSchema(name: String): Schema = {
     try {
       val db = catalog.getSubCatalog(name)
-      new ExternalCatalogSchema(name, db)
+      new ExternalCatalogSchema(tableEnv, name, db)
     } catch {
       case _: CatalogNotExistException =>
         LOG.warn(s"Sub-catalog $name does not exist in externalCatalog $catalogIdentifier")
@@ -75,7 +77,7 @@ class ExternalCatalogSchema(
     */
   override def getTable(name: String): Table = try {
     val externalCatalogTable = catalog.getTable(name)
-    ExternalTableSourceUtil.fromExternalCatalogTable(externalCatalogTable)
+    ExternalTableSourceUtil.fromExternalCatalogTable(tableEnv, externalCatalogTable)
   } catch {
     case TableNotExistException(table, _, _) => {
       LOG.warn(s"Table $table does not exist in externalCatalog $catalogIdentifier")
@@ -111,15 +113,17 @@ object ExternalCatalogSchema {
   /**
     * Registers an external catalog in a Calcite schema.
     *
+    * @param tableEnv                  The environment the catalog will be part of.
     * @param parentSchema              Parent schema into which the catalog is registered
     * @param externalCatalogIdentifier Identifier of the external catalog
     * @param externalCatalog           The external catalog to register
     */
   def registerCatalog(
+      tableEnv: TableEnvironment,
       parentSchema: SchemaPlus,
       externalCatalogIdentifier: String,
       externalCatalog: ExternalCatalog): Unit = {
-    val newSchema = new ExternalCatalogSchema(externalCatalogIdentifier, externalCatalog)
+    val newSchema = new ExternalCatalogSchema(tableEnv, externalCatalogIdentifier, externalCatalog)
     val schemaPlusOfNewSchema = parentSchema.add(externalCatalogIdentifier, newSchema)
     newSchema.registerSubSchemas(schemaPlusOfNewSchema)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
index ae20718..a7c20ac 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
@@ -18,28 +18,296 @@
 
 package org.apache.flink.table.catalog
 
-import java.util.{HashMap => JHashMap, Map => JMap}
 import java.lang.{Long => JLong}
+import java.util.{HashMap => JHashMap, Map => JMap}
 
-import org.apache.flink.table.api.TableSchema
+import org.apache.flink.table.api.{TableException, TableSchema}
+import org.apache.flink.table.catalog.ExternalCatalogTable._
+import org.apache.flink.table.descriptors.MetadataValidator.{METADATA_COMMENT, METADATA_CREATION_TIME, METADATA_LAST_ACCESS_TIME}
+import org.apache.flink.table.descriptors._
 import org.apache.flink.table.plan.stats.TableStats
 
+import scala.collection.JavaConverters._
+
 /**
   * Defines a table in an [[ExternalCatalog]].
   *
-  * @param tableType            Table type, e.g csv, hbase, kafka
-  * @param schema               Schema of the table (column names and types)
-  * @param properties           Properties of the table
-  * @param stats                Statistics of the table
-  * @param comment              Comment of the table
-  * @param createTime           Create timestamp of the table
-  * @param lastAccessTime       Timestamp of last access of the table
+  * For backwards compatibility this class supports both the legacy table type (see
+  * [[org.apache.flink.table.annotation.TableType]]) and the factory-based (see
+  * [[org.apache.flink.table.sources.TableSourceFactory]]) approach.
+  *
+  * @param connectorDesc describes the system to connect to
+  * @param formatDesc describes the data format of a connector
+  * @param schemaDesc describes the schema of the result table
+  * @param statisticsDesc describes the estimated statistics of the result table
+  * @param metadataDesc describes additional metadata of a table
   */
-case class ExternalCatalogTable(
+class ExternalCatalogTable(
+    connectorDesc: ConnectorDescriptor,
+    formatDesc: Option[FormatDescriptor],
+    schemaDesc: Option[Schema],
+    statisticsDesc: Option[Statistics],
+    metadataDesc: Option[Metadata])
+  extends TableSourceDescriptor(connectorDesc) {
+
+  this.formatDescriptor = formatDesc
+  this.schemaDescriptor = schemaDesc
+  this.statisticsDescriptor = statisticsDesc
+  this.metaDescriptor = metadataDesc
+
+  // expose statistics for external table source util
+  override def getTableStats: Option[TableStats] = super.getTableStats
+
+  // ----------------------------------------------------------------------------------------------
+  // NOTE: the following code is used for backwards compatibility to the TableType approach
+  // ----------------------------------------------------------------------------------------------
+
+  /**
+    * Returns the legacy table type of an external catalog table.
+    *
+    * @deprecated for backwards compatibility.
+    */
+  @Deprecated
+  @deprecated("For backwards compatibility.")
+  lazy val tableType: String = {
+    val props = new DescriptorProperties()
+    connectorDesc.addProperties(props)
+    props
+      .getString(CONNECTOR_LEGACY_TYPE)
+      .getOrElse(throw new TableException("Could not find a legacy table type to return."))
+  }
+
+  /**
+    * Returns the legacy schema of an external catalog table.
+    *
+    * @deprecated for backwards compatibility.
+    */
+  @Deprecated
+  @deprecated("For backwards compatibility.")
+  lazy val schema: TableSchema = {
+    val props = new DescriptorProperties()
+    connectorDesc.addProperties(props)
+    props
+      .getTableSchema(CONNECTOR_LEGACY_SCHEMA)
+      .getOrElse(throw new TableException("Could not find a legacy schema to return."))
+  }
+
+  /**
+    * Returns the legacy properties of an external catalog table.
+    *
+    * @deprecated for backwards compatibility.
+    */
+  @Deprecated
+  @deprecated("For backwards compatibility.")
+  lazy val properties: JMap[String, String] = {
+    // skip normalization
+    val props = new DescriptorProperties(normalizeKeys = false)
+    val legacyProps = new JHashMap[String, String]()
+    connectorDesc.addProperties(props)
+    props.asMap.flatMap { case (k, v) =>
+      if (k.startsWith(CONNECTOR_LEGACY_PROPERTY)) {
+        // remove "connector.legacy-property-"
+        Some(legacyProps.put(k.substring(CONNECTOR_LEGACY_PROPERTY.length + 1), v))
+      } else {
+        None
+      }
+    }
+    legacyProps
+  }
+
+  /**
+    * Returns the legacy statistics of an external catalog table.
+    *
+    * @deprecated for backwards compatibility.
+    */
+  @Deprecated
+  @deprecated("For backwards compatibility.")
+  lazy val stats: TableStats = getTableStats.orNull
+
+  /**
+    * Returns the legacy comment of an external catalog table.
+    *
+    * @deprecated for backwards compatibility.
+    */
+  @Deprecated
+  @deprecated("For backwards compatibility.")
+  lazy val comment: String = {
+    val normalizedProps = new DescriptorProperties()
+
+    metadataDesc match {
+      case Some(meta) =>
+        meta.addProperties(normalizedProps)
+        normalizedProps.getString(METADATA_COMMENT).orNull
+      case None =>
+        null
+    }
+  }
+
+  /**
+    * Returns the legacy creation time of an external catalog table.
+    *
+    * @deprecated for backwards compatibility.
+    */
+  @Deprecated
+  @deprecated("For backwards compatibility.")
+  lazy val createTime: JLong = {
+    val normalizedProps = new DescriptorProperties()
+
+    metadataDesc match {
+      case Some(meta) =>
+        meta.addProperties(normalizedProps)
+        normalizedProps.getLong(METADATA_CREATION_TIME).map(v => Long.box(v)).orNull
+      case None =>
+        null
+    }
+  }
+
+  /**
+    * Returns the legacy last access time of an external catalog table.
+    *
+    * @deprecated for backwards compatibility.
+    */
+  @Deprecated
+  @deprecated("For backwards compatibility.")
+  lazy val lastAccessTime: JLong = {
+    val normalizedProps = new DescriptorProperties()
+
+    metadataDesc match {
+      case Some(meta) =>
+        meta.addProperties(normalizedProps)
+        normalizedProps.getLong(METADATA_LAST_ACCESS_TIME).map(v => Long.box(v)).orNull
+      case None =>
+        null
+    }
+  }
+
+  /**
+    * Defines a table in an [[ExternalCatalog]].
+    *
+    * @param tableType            Table type, e.g csv, hbase, kafka
+    * @param schema               Schema of the table (column names and types)
+    * @param properties           Properties of the table
+    * @param stats                Statistics of the table
+    * @param comment              Comment of the table
+    * @param createTime           Create timestamp of the table
+    * @param lastAccessTime       Timestamp of last access of the table
+    * @deprecated Use a descriptor-based constructor instead.
+    */
+  @Deprecated
+  @deprecated("Use a descriptor-based constructor instead.")
+  def this(
+    tableType: String,
+    schema: TableSchema,
+    properties: JMap[String, String] = new JHashMap(),
+    stats: TableStats = null,
+    comment: String = null,
+    createTime: JLong = System.currentTimeMillis,
+    lastAccessTime: JLong = -1L) = {
+
+    this(
+      toConnectorDescriptor(tableType, schema, properties),
+      None,
+      None,
+      Some(toStatisticsDescriptor(stats)),
+      Some(toMetadataDescriptor(comment, createTime, lastAccessTime)))
+  }
+
+  /**
+    * Returns whether this external catalog table uses the legacy table type.
+    *
+    * @deprecated for backwards compatibility.
+    */
+  @Deprecated
+  @deprecated("For backwards compatibility.")
+  def isLegacyTableType: Boolean = connectorDesc.isInstanceOf[TableTypeConnector]
+}
+
+object ExternalCatalogTable {
+
+  val CONNECTOR_TYPE_VALUE = "legacy-table-type"
+  val CONNECTOR_LEGACY_TYPE = "connector.legacy-type"
+  val CONNECTOR_LEGACY_SCHEMA = "connector.legacy-schema"
+  val CONNECTOR_LEGACY_PROPERTY = "connector.legacy-property"
+
+  /**
+    * Defines a table in an [[ExternalCatalog]].
+    *
+    * @param tableType            Table type, e.g csv, hbase, kafka
+    * @param schema               Schema of the table (column names and types)
+    * @param properties           Properties of the table
+    * @param stats                Statistics of the table
+    * @param comment              Comment of the table
+    * @param createTime           Create timestamp of the table
+    * @param lastAccessTime       Timestamp of last access of the table
+    * @deprecated Use a descriptor-based constructor instead.
+    */
+  @Deprecated
+  @deprecated("Use a descriptor-based constructor instead.")
+  def apply(
     tableType: String,
     schema: TableSchema,
     properties: JMap[String, String] = new JHashMap(),
     stats: TableStats = null,
     comment: String = null,
     createTime: JLong = System.currentTimeMillis,
-    lastAccessTime: JLong = -1L)
+    lastAccessTime: JLong = -1L): ExternalCatalogTable = {
+
+    new ExternalCatalogTable(
+      tableType,
+      schema,
+      properties,
+      stats,
+      comment,
+      createTime,
+      lastAccessTime)
+  }
+
+  class TableTypeConnector(
+      tableType: String,
+      schema: TableSchema,
+      legacyProperties: JMap[String, String])
+    extends ConnectorDescriptor(CONNECTOR_TYPE_VALUE, version = 1) {
+
+    override protected def addConnectorProperties(properties: DescriptorProperties): Unit = {
+      properties.putString(CONNECTOR_LEGACY_TYPE, tableType)
+      properties.putTableSchema(CONNECTOR_LEGACY_SCHEMA, schema)
+      legacyProperties.asScala.foreach { case (k, v) =>
+          properties.putString(s"$CONNECTOR_LEGACY_PROPERTY-$k", v)
+      }
+    }
+
+    override private[flink] def needsFormat() = false
+  }
+
+  def toConnectorDescriptor(
+      tableType: String,
+      schema: TableSchema,
+      properties: JMap[String, String])
+    : ConnectorDescriptor = {
+
+    new TableTypeConnector(tableType, schema, properties)
+  }
+
+  def toStatisticsDescriptor(stats: TableStats): Statistics = {
+    val statsDesc = Statistics()
+    if (stats != null) {
+      statsDesc.tableStats(stats)
+    }
+    statsDesc
+  }
+
+  def toMetadataDescriptor(
+      comment: String,
+      createTime: JLong,
+      lastAccessTime: JLong)
+    : Metadata = {
+
+    val metadataDesc = Metadata()
+    if (comment != null) {
+      metadataDesc.comment(comment)
+    }
+    metadataDesc
+      .creationTime(createTime)
+      .lastAccessTime(lastAccessTime)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
index d2e297f..3bc5dc0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
@@ -23,22 +23,74 @@ import java.net.URL
 import org.apache.commons.configuration.{ConfigurationException, ConversionException, PropertiesConfiguration}
 import org.apache.flink.annotation.VisibleForTesting
 import org.apache.flink.table.annotation.TableType
-import org.apache.flink.table.api.{AmbiguousTableSourceConverterException, NoMatchedTableSourceConverterException, TableException}
+import org.apache.flink.table.api._
 import org.apache.flink.table.plan.schema.{BatchTableSourceTable, StreamTableSourceTable, TableSourceTable}
 import org.apache.flink.table.plan.stats.FlinkStatistic
-import org.apache.flink.table.sources.{BatchTableSource, StreamTableSource, TableSource}
+import org.apache.flink.table.sources.{BatchTableSource, StreamTableSource, TableSource, TableSourceFactoryService}
 import org.apache.flink.table.util.Logging
 import org.apache.flink.util.InstantiationUtil
 import org.reflections.Reflections
 
-import scala.collection.JavaConverters._
-import scala.collection.mutable
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.mutable
 
 /**
   * The utility class is used to convert ExternalCatalogTable to TableSourceTable.
   */
 object ExternalTableSourceUtil extends Logging {
 
+  /**
+    * Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance
+    *
+    * @param externalCatalogTable the [[ExternalCatalogTable]] instance which to convert
+    * @return converted [[TableSourceTable]] instance from the input catalog table
+    */
+  def fromExternalCatalogTable(
+      tableEnv: TableEnvironment,
+      externalCatalogTable: ExternalCatalogTable)
+    : TableSourceTable[_] = {
+
+    // check for the legacy external catalog path
+    if (externalCatalogTable.isLegacyTableType) {
+      LOG.warn("External catalog tables based on TableType annotations are deprecated. " +
+        "Please consider updating them to TableSourceFactories.")
+      fromExternalCatalogTableType(externalCatalogTable)
+    }
+    // use the factory approach
+    else {
+      val source = TableSourceFactoryService.findTableSourceFactory(externalCatalogTable)
+      tableEnv match {
+        // check for a batch table source in this batch environment
+        case _: BatchTableEnvironment =>
+          source match {
+            case bts: BatchTableSource[_] =>
+              new BatchTableSourceTable(
+                bts,
+                new FlinkStatistic(externalCatalogTable.getTableStats))
+            case _ => throw new TableException(
+              s"Found table source '${source.getClass.getCanonicalName}' is not applicable " +
+                s"in a batch environment.")
+          }
+        // check for a stream table source in this streaming environment
+        case _: StreamTableEnvironment =>
+          source match {
+            case sts: StreamTableSource[_] =>
+              new StreamTableSourceTable(
+                sts,
+                new FlinkStatistic(externalCatalogTable.getTableStats))
+            case _ => throw new TableException(
+              s"Found table source '${source.getClass.getCanonicalName}' is not applicable " +
+                s"in a streaming environment.")
+          }
+        case _ => throw new TableException("Unsupported table environment.")
+      }
+    }
+  }
+
+  // ----------------------------------------------------------------------------------------------
+  // NOTE: the following lines can be removed once we drop support for TableType
+  // ----------------------------------------------------------------------------------------------
+
   // config file to specify scan package to search TableSourceConverter
   private val tableSourceConverterConfigFileName = "tableSourceConverter.properties"
 
@@ -48,7 +100,7 @@ object ExternalTableSourceUtil extends Logging {
     val registeredConverters =
       new mutable.HashMap[String, mutable.Set[Class[_ <: TableSourceConverter[_]]]]
           with mutable.MultiMap[String, Class[_ <: TableSourceConverter[_]]]
-    // scan all config files to find TableSourceConverters which are annotationed with TableType.
+    // scan all config files to find TableSourceConverters which are annotated with TableType.
     val resourceUrls = getClass.getClassLoader.getResources(tableSourceConverterConfigFileName)
     while (resourceUrls.hasMoreElements) {
       val url = resourceUrls.nextElement()
@@ -89,12 +141,31 @@ object ExternalTableSourceUtil extends Logging {
   }
 
   /**
-    * Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance
+    * Parses scan package set from input config file
     *
-    * @param externalCatalogTable the [[ExternalCatalogTable]] instance which to convert
-    * @return converted [[TableSourceTable]] instance from the input catalog table
+    * @param url url of config file
+    * @return scan package set
     */
-  def fromExternalCatalogTable(externalCatalogTable: ExternalCatalogTable): TableSourceTable[_] = {
+  private def parseScanPackagesFromConfigFile(url: URL): Set[String] = {
+    try {
+      val config = new PropertiesConfiguration(url)
+      config.setListDelimiter(',')
+      config.getStringArray("scan.packages").filterNot(_.isEmpty).toSet
+    } catch {
+      case e: ConfigurationException =>
+        LOG.warn(s"Error happened while loading the properties file [$url]", e)
+        Set.empty
+      case e1: ConversionException =>
+        LOG.warn(s"Error happened while parsing 'scan.packages' field of properties file [$url]. " +
+            s"The value is not a String or List of Strings.", e1)
+        Set.empty
+    }
+  }
+
+  @VisibleForTesting
+  def fromExternalCatalogTableType(externalCatalogTable: ExternalCatalogTable)
+    : TableSourceTable[_] = {
+
     val tableType = externalCatalogTable.tableType
     val propertyKeys = externalCatalogTable.properties.keySet()
     tableTypeToTableSourceConvertersClazz.get(tableType) match {
@@ -141,27 +212,4 @@ object ExternalTableSourceUtil extends Logging {
         throw new NoMatchedTableSourceConverterException(tableType)
     }
   }
-
-  /**
-    * Parses scan package set from input config file
-    *
-    * @param url url of config file
-    * @return scan package set
-    */
-  private def parseScanPackagesFromConfigFile(url: URL): Set[String] = {
-    try {
-      val config = new PropertiesConfiguration(url)
-      config.setListDelimiter(',')
-      config.getStringArray("scan.packages").filterNot(_.isEmpty).toSet
-    } catch {
-      case e: ConfigurationException =>
-        LOG.warn(s"Error happened while loading the properties file [$url]", e)
-        Set.empty
-      case e1: ConversionException =>
-        LOG.warn(s"Error happened while parsing 'scan.packages' field of properties file [$url]. " +
-            s"The value is not a String or List of Strings.", e1)
-        Set.empty
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/TableSourceConverter.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/TableSourceConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/TableSourceConverter.scala
index ca6df9a..e6248b0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/TableSourceConverter.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/TableSourceConverter.scala
@@ -29,7 +29,11 @@ import org.apache.flink.table.sources.TableSource
   * table is supported.
   *
   * @tparam T The [[TableSource]] to be created by this converter.
+  *
+  * @deprecated Use the more generic [[org.apache.flink.table.sources.TableSourceFactory]] instead.
   */
+@Deprecated
+@deprecated("Use the more generic table source factories instead.")
 trait TableSourceConverter[T <: TableSource[_]] {
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala
new file mode 100644
index 0000000..3c8366d
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{BatchTableEnvironment, Table, TableException}
+import org.apache.flink.table.sources.{BatchTableSource, TableSource, TableSourceFactoryService}
+
+class BatchTableSourceDescriptor(tableEnv: BatchTableEnvironment, connector: ConnectorDescriptor)
+  extends TableSourceDescriptor(connector) {
+
+  /**
+    * Searches for the specified table source, configures it accordingly, and returns it.
+    */
+  def toTableSource: TableSource[_] = {
+    val source = TableSourceFactoryService.findTableSourceFactory(this)
+    source match {
+      case _: BatchTableSource[_] => source
+      case _ => throw new TableException(
+        s"Found table source '${source.getClass.getCanonicalName}' is not applicable " +
+          s"in a batch environment.")
+    }
+  }
+
+  /**
+    * Searches for the specified table source, configures it accordingly, and returns it as a table.
+    */
+  def toTable: Table = {
+    tableEnv.fromTableSource(toTableSource)
+  }
+
+  /**
+    * Searches for the specified table source, configures it accordingly, and registers it as
+    * a table under the given name.
+    *
+    * @param name table name to be registered in the table environment
+    */
+  def register(name: String): Unit = {
+    tableEnv.registerTableSource(name, toTableSource)
+  }
+
+  /**
+    * Specifies the format that defines how to read data from a connector.
+    */
+  def withFormat(format: FormatDescriptor): BatchTableSourceDescriptor = {
+    formatDescriptor = Some(format)
+    this
+  }
+
+  /**
+    * Specifies the resulting table schema.
+    */
+  def withSchema(schema: Schema): BatchTableSourceDescriptor = {
+    schemaDescriptor = Some(schema)
+    this
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptor.scala
new file mode 100644
index 0000000..f691b4f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptor.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_TYPE, CONNECTOR_VERSION}
+
+/**
+  * Describes a connector to an other system.
+  *
+  * @param tpe string identifier for the connector
+  */
+abstract class ConnectorDescriptor(
+    private val tpe: String,
+    private val version: Int)
+  extends Descriptor {
+
+  override def toString: String = this.getClass.getSimpleName
+
+  /**
+    * Internal method for properties conversion.
+    */
+  final private[flink] def addProperties(properties: DescriptorProperties): Unit = {
+    properties.putString(CONNECTOR_TYPE, tpe)
+    properties.putLong(CONNECTOR_VERSION, version)
+    addConnectorProperties(properties)
+  }
+
+  /**
+    * Internal method for connector properties conversion.
+    */
+  protected def addConnectorProperties(properties: DescriptorProperties): Unit
+
+  /**
+    * Internal method that defines if this connector requires a format descriptor.
+    */
+  private[flink] def needsFormat(): Boolean
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.scala
new file mode 100644
index 0000000..8ab0f45
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_TYPE, CONNECTOR_VERSION}
+
+/**
+  * Validator for [[ConnectorDescriptor]].
+  */
+class ConnectorDescriptorValidator extends DescriptorValidator {
+
+  override def validate(properties: DescriptorProperties): Unit = {
+    properties.validateString(CONNECTOR_TYPE, isOptional = false, minLen = 1)
+    properties.validateInt(CONNECTOR_VERSION, isOptional = true, 0, Integer.MAX_VALUE)
+  }
+}
+
+object ConnectorDescriptorValidator {
+
+  val CONNECTOR_TYPE = "connector.type"
+  val CONNECTOR_VERSION = "connector.version"
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Csv.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Csv.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Csv.scala
new file mode 100644
index 0000000..0493d99
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Csv.scala
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{TableSchema, ValidationException}
+import org.apache.flink.table.descriptors.CsvValidator._
+
+import scala.collection.mutable
+
+/**
+  * Format descriptor for comma-separated values (CSV).
+  */
+class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, version = 1) {
+
+  private var fieldDelim: Option[String] = None
+  private var lineDelim: Option[String] = None
+  private val formatSchema: mutable.LinkedHashMap[String, String] =
+      mutable.LinkedHashMap[String, String]()
+  private var quoteCharacter: Option[Character] = None
+  private var commentPrefix: Option[String] = None
+  private var isIgnoreFirstLine: Option[Boolean] = None
+  private var lenient: Option[Boolean] = None
+
+  /**
+    * Sets the field delimiter, "," by default.
+    *
+    * @param delim the field delimiter
+    */
+  def fieldDelimiter(delim: String): Csv = {
+    this.fieldDelim = Some(delim)
+    this
+  }
+
+  /**
+    * Sets the line delimiter, "\n" by default.
+    *
+    * @param delim the line delimiter
+    */
+  def lineDelimiter(delim: String): Csv = {
+    this.lineDelim = Some(delim)
+    this
+  }
+
+  /**
+    * Sets the format schema with field names and the types. Required.
+    * The table schema must not contain nested fields.
+    *
+    * This method overwrites existing fields added with [[field()]].
+    *
+    * @param schema the table schema
+    */
+  def schema(schema: TableSchema): Csv = {
+    this.formatSchema.clear()
+    DescriptorProperties.normalizeTableSchema(schema).foreach {
+      case (n, t) => field(n, t)
+    }
+    this
+  }
+
+  /**
+    * Adds a format field with the field name and the type information. Required.
+    * This method can be called multiple times. The call order of this method defines
+    * also the order of the fields in the format.
+    *
+    * @param fieldName the field name
+    * @param fieldType the type information of the field
+    */
+  def field(fieldName: String, fieldType: TypeInformation[_]): Csv = {
+    field(fieldName, DescriptorProperties.normalizeTypeInfo(fieldType))
+    this
+  }
+
+  /**
+    * Adds a format field with the field name and the type string. Required.
+    * This method can be called multiple times. The call order of this method defines
+    * also the order of the fields in the format.
+    *
+    * @param fieldName the field name
+    * @param fieldType the type string of the field
+    */
+  def field(fieldName: String, fieldType: String): Csv = {
+    if (formatSchema.contains(fieldName)) {
+      throw new ValidationException(s"Duplicate field name $fieldName.")
+    }
+    formatSchema += (fieldName -> fieldType)
+    this
+  }
+
+  /**
+    * Sets a quote character for String values, null by default.
+    *
+    * @param quote the quote character
+    */
+  def quoteCharacter(quote: Character): Csv = {
+    this.quoteCharacter = Option(quote)
+    this
+  }
+
+  /**
+    * Sets a prefix to indicate comments, null by default.
+    *
+    * @param prefix the prefix to indicate comments
+    */
+  def commentPrefix(prefix: String): Csv = {
+    this.commentPrefix = Option(prefix)
+    this
+  }
+
+  /**
+    * Ignore the first line. Not skip the first line by default.
+    */
+  def ignoreFirstLine(): Csv = {
+    this.isIgnoreFirstLine = Some(true)
+    this
+  }
+
+  /**
+    * Skip records with parse error instead to fail. Throw an exception by default.
+    */
+  def ignoreParseErrors(): Csv = {
+    this.lenient = Some(true)
+    this
+  }
+
+  /**
+    * Internal method for format properties conversion.
+    */
+  override protected def addFormatProperties(properties: DescriptorProperties): Unit = {
+    fieldDelim.foreach(properties.putString(FORMAT_FIELD_DELIMITER, _))
+    lineDelim.foreach(properties.putString(FORMAT_LINE_DELIMITER, _))
+    properties.putTableSchema(FORMAT_FIELDS, formatSchema.toIndexedSeq)
+    quoteCharacter.foreach(properties.putCharacter(FORMAT_QUOTE_CHARACTER, _))
+    commentPrefix.foreach(properties.putString(FORMAT_COMMENT_PREFIX, _))
+    isIgnoreFirstLine.foreach(properties.putBoolean(FORMAT_IGNORE_FIRST_LINE, _))
+    lenient.foreach(properties.putBoolean(FORMAT_IGNORE_PARSE_ERRORS, _))
+  }
+}
+
+/**
+  * Format descriptor for comma-separated values (CSV).
+  */
+object Csv {
+
+  /**
+    * Format descriptor for comma-separated values (CSV).
+    */
+  def apply(): Csv = new Csv()
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/CsvValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/CsvValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/CsvValidator.scala
new file mode 100644
index 0000000..d49314e
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/CsvValidator.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.CsvValidator._
+import org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_TYPE
+
+/**
+  * Validator for [[Csv]].
+  */
+class CsvValidator extends FormatDescriptorValidator {
+
+  override def validate(properties: DescriptorProperties): Unit = {
+    super.validate(properties)
+    properties.validateValue(FORMAT_TYPE, FORMAT_TYPE_VALUE, isOptional = false)
+    properties.validateString(FORMAT_FIELD_DELIMITER, isOptional = true, minLen = 1)
+    properties.validateString(FORMAT_LINE_DELIMITER, isOptional = true, minLen = 1)
+    properties.validateString(FORMAT_QUOTE_CHARACTER, isOptional = true, minLen = 1, maxLen = 1)
+    properties.validateString(FORMAT_COMMENT_PREFIX, isOptional = true, minLen = 1)
+    properties.validateBoolean(FORMAT_IGNORE_FIRST_LINE, isOptional = true)
+    properties.validateBoolean(FORMAT_IGNORE_PARSE_ERRORS, isOptional = true)
+    properties.validateTableSchema(FORMAT_FIELDS, isOptional = false)
+  }
+}
+
+object CsvValidator {
+
+  val FORMAT_TYPE_VALUE = "csv"
+  val FORMAT_FIELD_DELIMITER = "format.field-delimiter"
+  val FORMAT_LINE_DELIMITER = "format.line-delimiter"
+  val FORMAT_QUOTE_CHARACTER = "format.quote-character"
+  val FORMAT_COMMENT_PREFIX = "format.comment-prefix"
+  val FORMAT_IGNORE_FIRST_LINE = "format.ignore-first-line"
+  val FORMAT_IGNORE_PARSE_ERRORS = "format.ignore-parse-errors"
+  val FORMAT_FIELDS = "format.fields"
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala
new file mode 100644
index 0000000..ad97ded
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+  * A class that adds a set of string-based, normalized properties for describing a
+  * table source or table sink.
+  */
+abstract class Descriptor {
+
+  /**
+    * Internal method for properties conversion.
+    */
+  private[flink] def addProperties(properties: DescriptorProperties): Unit
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
new file mode 100644
index 0000000..43d63b3
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import java.io.Serializable
+import java.lang.{Boolean => JBoolean, Double => JDouble, Integer => JInt, Long => JLong}
+import java.util
+import java.util.regex.Pattern
+
+import org.apache.commons.codec.binary.Base64
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{TableSchema, ValidationException}
+import org.apache.flink.table.descriptors.DescriptorProperties.{NAME, TYPE, normalizeTableSchema}
+import org.apache.flink.table.typeutils.TypeStringUtils
+import org.apache.flink.util.InstantiationUtil
+import org.apache.flink.util.Preconditions.checkNotNull
+
+import scala.collection.mutable
+
+import scala.collection.JavaConverters._
+
+/**
+  * Utility class for having a unified string-based representation of Table API related classes
+  * such as [[TableSchema]], [[TypeInformation]], etc.
+  *
+  * @param normalizeKeys flag that indicates if keys should be normalized (this flag is
+  *                      necessary for backwards compatibility)
+  */
+class DescriptorProperties(normalizeKeys: Boolean = true) {
+
+  private val properties: mutable.Map[String, String] = new mutable.HashMap[String, String]()
+
+  private def put(key: String, value: String): Unit = {
+    if (properties.contains(key)) {
+      throw new IllegalStateException("Property already present.")
+    }
+    if (normalizeKeys) {
+      properties.put(key.toLowerCase, value)
+    } else {
+      properties.put(key, value)
+    }
+  }
+
+  // for testing
+  private[flink] def unsafePut(key: String, value: String): Unit = {
+    properties.put(key, value)
+  }
+
+  // for testing
+  private[flink] def unsafeRemove(key: String): Unit = {
+    properties.remove(key)
+  }
+
+  def putProperties(properties: Map[String, String]): Unit = {
+    properties.foreach { case (k, v) =>
+      put(k, v)
+    }
+  }
+
+  def putProperties(properties: java.util.Map[String, String]): Unit = {
+    properties.asScala.foreach { case (k, v) =>
+      put(k, v)
+    }
+  }
+
+  def putClass(key: String, clazz: Class[_]): Unit = {
+    checkNotNull(key)
+    checkNotNull(clazz)
+    val error = InstantiationUtil.checkForInstantiationError(clazz)
+    if (error != null) {
+      throw new ValidationException(s"Class '${clazz.getName}' is not supported: $error")
+    }
+    put(key, clazz.getName)
+  }
+
+  def putString(key: String, str: String): Unit = {
+    checkNotNull(key)
+    checkNotNull(str)
+    put(key, str)
+  }
+
+  def putBoolean(key: String, b: Boolean): Unit = {
+    checkNotNull(key)
+    put(key, b.toString)
+  }
+
+  def putLong(key: String, l: Long): Unit = {
+    checkNotNull(key)
+    put(key, l.toString)
+  }
+
+  def putInt(key: String, i: Int): Unit = {
+    checkNotNull(key)
+    put(key, i.toString)
+  }
+
+  def putCharacter(key: String, c: Character): Unit = {
+    checkNotNull(key)
+    checkNotNull(c)
+    put(key, c.toString)
+  }
+
+  def putTableSchema(key: String, schema: TableSchema): Unit = {
+    putTableSchema(key, normalizeTableSchema(schema))
+  }
+
+  def putTableSchema(key: String, nameAndType: Seq[(String, String)]): Unit = {
+    putIndexedFixedProperties(
+      key,
+      Seq(NAME, TYPE),
+      nameAndType.map(t => Seq(t._1, t._2))
+    )
+  }
+
+  /**
+    * Adds an indexed sequence of properties (with sub-properties) under a common key.
+    *
+    * For example:
+    *
+    * schema.fields.0.type = INT, schema.fields.0.name = test
+    * schema.fields.1.type = LONG, schema.fields.1.name = test2
+    *
+    * The arity of each propertyValue must match the arity of propertyKeys.
+    */
+  def putIndexedFixedProperties(
+      key: String,
+      propertyKeys: Seq[String],
+      propertyValues: Seq[Seq[String]])
+    : Unit = {
+    checkNotNull(key)
+    checkNotNull(propertyValues)
+    propertyValues.zipWithIndex.foreach { case (values, idx) =>
+      if (values.lengthCompare(propertyKeys.size) != 0) {
+        throw new ValidationException("Values must have same arity as keys.")
+      }
+      values.zipWithIndex.foreach { case (value, keyIdx) =>
+          put(s"$key.$idx.${propertyKeys(keyIdx)}", value)
+      }
+    }
+  }
+
+  /**
+    * Adds an indexed mapping of properties under a common key.
+    *
+    * For example:
+    *
+    * schema.fields.0.type = INT, schema.fields.0.name = test
+    *                             schema.fields.1.name = test2
+    *
+    * The arity of the propertySets can differ.
+    */
+  def putIndexedVariableProperties(
+      key: String,
+      propertySets: Seq[Map[String, String]])
+    : Unit = {
+    checkNotNull(key)
+    checkNotNull(propertySets)
+    propertySets.zipWithIndex.foreach { case (propertySet, idx) =>
+      propertySet.foreach { case (k, v) =>
+        put(s"$key.$idx.$k", v)
+      }
+    }
+  }
+
+  // ----------------------------------------------------------------------------------------------
+
+  def getString(key: String): Option[String] = {
+    properties.get(key)
+  }
+
+  def getCharacter(key: String): Option[Character] = getString(key) match {
+    case Some(c) =>
+      if (c.length != 1) {
+        throw new ValidationException(s"The value of $key must only contain one character.")
+      }
+      Some(c.charAt(0))
+
+    case None => None
+  }
+
+  def getBoolean(key: String): Option[Boolean] = getString(key) match {
+    case Some(b) => Some(JBoolean.parseBoolean(b))
+
+    case None => None
+  }
+
+  def getInt(key: String): Option[Int] = getString(key) match {
+    case Some(l) => Some(JInt.parseInt(l))
+
+    case None => None
+  }
+
+  def getLong(key: String): Option[Long] = getString(key) match {
+    case Some(l) => Some(JLong.parseLong(l))
+
+    case None => None
+  }
+
+  def getDouble(key: String): Option[Double] = getString(key) match {
+    case Some(d) => Some(JDouble.parseDouble(d))
+
+    case None => None
+  }
+
+  def getTableSchema(key: String): Option[TableSchema] = {
+    // filter for number of columns
+    val fieldCount = properties
+      .filterKeys(k => k.startsWith(key) && k.endsWith(s".$NAME"))
+      .size
+
+    if (fieldCount == 0) {
+      return None
+    }
+
+    // validate fields and build schema
+    val schemaBuilder = TableSchema.builder()
+    for (i <- 0 until fieldCount) {
+      val name = s"$key.$i.$NAME"
+      val tpe = s"$key.$i.$TYPE"
+      schemaBuilder.field(
+        properties.getOrElse(name, throw new ValidationException(s"Invalid table schema. " +
+          s"Could not find name for field '$key.$i'.")
+        ),
+        TypeStringUtils.readTypeInfo(
+          properties.getOrElse(tpe, throw new ValidationException(s"Invalid table schema. " +
+          s"Could not find type for field '$key.$i'."))
+        )
+      )
+    }
+    Some(schemaBuilder.build())
+  }
+
+  // ----------------------------------------------------------------------------------------------
+
+  def validateString(
+      key: String,
+      isOptional: Boolean,
+      minLen: Int = 0, // inclusive
+      maxLen: Int = Integer.MAX_VALUE) // inclusive
+    : Unit = {
+
+    if (!properties.contains(key)) {
+      if (!isOptional) {
+        throw new ValidationException(s"Could not find required property '$key'.")
+      }
+    } else {
+      val len = properties(key).length
+      if (len < minLen || len > maxLen) {
+        throw new ValidationException(
+          s"Property '$key' must have a length between $minLen and $maxLen but " +
+            s"was: ${properties(key)}")
+      }
+    }
+  }
+
+  def validateInt(
+      key: String,
+      isOptional: Boolean,
+      min: Int = Int.MinValue, // inclusive
+      max: Int = Int.MaxValue) // inclusive
+    : Unit = {
+
+    if (!properties.contains(key)) {
+      if (!isOptional) {
+        throw new ValidationException(s"Could not find required property '$key'.")
+      }
+    } else {
+      try {
+        val value = Integer.parseInt(properties(key))
+        if (value < min || value > max) {
+          throw new ValidationException(s"Property '$key' must be an integer value between $min " +
+            s"and $max but was: ${properties(key)}")
+        }
+      } catch {
+        case _: NumberFormatException =>
+          throw new ValidationException(
+            s"Property '$key' must be an integer value but was: ${properties(key)}")
+      }
+    }
+  }
+
+  def validateLong(
+      key: String,
+      isOptional: Boolean,
+      min: Long = Long.MinValue, // inclusive
+      max: Long = Long.MaxValue) // inclusive
+    : Unit = {
+
+    if (!properties.contains(key)) {
+      if (!isOptional) {
+        throw new ValidationException(s"Could not find required property '$key'.")
+      }
+    } else {
+      try {
+        val value = JLong.parseLong(properties(key))
+        if (value < min || value > max) {
+          throw new ValidationException(s"Property '$key' must be a long value between $min " +
+            s"and $max but was: ${properties(key)}")
+        }
+      } catch {
+        case _: NumberFormatException =>
+          throw new ValidationException(
+            s"Property '$key' must be a long value but was: ${properties(key)}")
+      }
+    }
+  }
+
+  def validateValue(key: String, value: String, isOptional: Boolean): Unit = {
+    if (!properties.contains(key)) {
+      if (!isOptional) {
+        throw new ValidationException(s"Could not find required property '$key'.")
+      }
+    } else {
+      if (properties(key) != value) {
+        throw new ValidationException(
+          s"Could not find required value '$value' for property '$key'.")
+      }
+    }
+  }
+
+  def validateBoolean(key: String, isOptional: Boolean): Unit = {
+    if (!properties.contains(key)) {
+      if (!isOptional) {
+        throw new ValidationException(s"Could not find required property '$key'.")
+      }
+    } else {
+      val value = properties(key)
+      if (!value.equalsIgnoreCase("true") && !value.equalsIgnoreCase("false")) {
+        throw new ValidationException(
+          s"Property '$key' must be a boolean value (true/false) but was: $value")
+      }
+    }
+  }
+
+  def validateDouble(
+      key: String,
+      isOptional: Boolean,
+      min: Double = Double.MinValue, // inclusive
+      max: Double = Double.MaxValue) // inclusive
+    : Unit = {
+
+    if (!properties.contains(key)) {
+      if (!isOptional) {
+        throw new ValidationException(s"Could not find required property '$key'.")
+      }
+    } else {
+      try {
+        val value = JDouble.parseDouble(properties(key))
+        if (value < min || value > max) {
+          throw new ValidationException(s"Property '$key' must be a double value between $min " +
+            s"and $max but was: ${properties(key)}")
+        }
+      } catch {
+        case _: NumberFormatException =>
+          throw new ValidationException(
+            s"Property '$key' must be an double value but was: ${properties(key)}")
+      }
+    }
+  }
+
+  def validateTableSchema(key: String, isOptional: Boolean): Unit = {
+    // filter for name columns
+    val names = getIndexedProperty(key, NAME)
+    // filter for type columns
+    val types = getIndexedProperty(key, TYPE)
+    if (names.isEmpty && types.isEmpty && !isOptional) {
+      throw new ValidationException(
+        s"Could not find the required schema for property '$key'.")
+    }
+    for (i <- 0 until Math.max(names.size, types.size)) {
+      validateString(s"$key.$i.$NAME", isOptional = false, minLen = 1)
+      validateType(s"$key.$i.$TYPE", isOptional = false)
+    }
+  }
+
+  def validateEnum(
+      key: String,
+      isOptional: Boolean,
+      enumToValidation: Map[String, () => Unit])
+    : Unit = {
+
+    if (!properties.contains(key)) {
+      if (!isOptional) {
+        throw new ValidationException(s"Could not find required property '$key'.")
+      }
+    } else {
+      val value = properties(key)
+      if (!enumToValidation.contains(value)) {
+        throw new ValidationException(s"Unknown value for property '$key'. " +
+          s"Supported values [${enumToValidation.keys.mkString(", ")}] but was: $value")
+      } else {
+        enumToValidation(value).apply() // run validation logic
+      }
+    }
+  }
+
+  def validateType(key: String, isOptional: Boolean): Unit = {
+    if (!properties.contains(key)) {
+      if (!isOptional) {
+        throw new ValidationException(s"Could not find required property '$key'.")
+      }
+    } else {
+      TypeStringUtils.readTypeInfo(properties(key)) // throws validation exceptions
+    }
+  }
+
+  def validatePrefixExclusion(prefix: String): Unit = {
+    val invalidField = properties.find(_._1.startsWith(prefix))
+    if (invalidField.isDefined) {
+      throw new ValidationException(
+        s"Property '${invalidField.get._1}' is not allowed in this context.")
+    }
+  }
+
+  def validateExclusion(key: String): Unit = {
+    if (properties.contains(key)) {
+      throw new ValidationException(s"Property '$key' is not allowed in this context.")
+    }
+  }
+
+  // ----------------------------------------------------------------------------------------------
+
+  def getIndexedProperty(key: String, property: String): Map[String, String] = {
+    val escapedKey = Pattern.quote(key)
+    properties.filterKeys(k => k.matches(s"$escapedKey\\.\\d+\\.$property")).toMap
+  }
+
+  def contains(str: String): Boolean = {
+    properties.exists(e => e._1.contains(str))
+  }
+
+  def hasPrefix(prefix: String): Boolean = {
+    properties.exists(e => e._1.startsWith(prefix))
+  }
+
+  def asMap: Map[String, String] = {
+    properties.toMap
+  }
+}
+
+object DescriptorProperties {
+
+  val TYPE = "type"
+  val NAME = "name"
+
+  // the string representation should be equal to SqlTypeName
+  def normalizeTypeInfo(typeInfo: TypeInformation[_]): String = {
+    checkNotNull(typeInfo)
+    TypeStringUtils.writeTypeInfo(typeInfo)
+  }
+
+  def normalizeTableSchema(schema: TableSchema): Seq[(String, String)] = {
+    schema.getColumnNames.zip(schema.getTypes).map { case (n, t) =>
+      (n, normalizeTypeInfo(t))
+    }
+  }
+
+  def serialize(obj: Serializable): String = {
+    // test public accessibility
+    val error = InstantiationUtil.checkForInstantiationError(obj.getClass)
+    if (error != null) {
+      throw new ValidationException(s"Class '${obj.getClass.getName}' is not supported: $error")
+    }
+    try {
+      val byteArray = InstantiationUtil.serializeObject(obj)
+      Base64.encodeBase64URLSafeString(byteArray)
+    } catch {
+      case e: Exception =>
+        throw new ValidationException(
+          s"Unable to serialize class '${obj.getClass.getCanonicalName}'.", e)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorValidator.scala
new file mode 100644
index 0000000..007a406
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorValidator.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+  * Validator for a descriptor. We put the validation methods and utilities in separate classes
+  * to keep the descriptor interfaces clean.
+  */
+trait DescriptorValidator {
+
+  /**
+    * Performs basic validation such as completeness tests.
+    */
+  def validate(properties: DescriptorProperties): Unit
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala
new file mode 100644
index 0000000..b1d900f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.FileSystemValidator.{CONNECTOR_PATH, CONNECTOR_TYPE_VALUE}
+
+/**
+  * Connector descriptor for a file system.
+  */
+class FileSystem extends ConnectorDescriptor(CONNECTOR_TYPE_VALUE, version = 1) {
+
+  private var path: Option[String] = None
+
+  /**
+    * Sets the path to a file or directory in a file system.
+    *
+    * @param path the path a file or directory
+    */
+  def path(path: String): FileSystem = {
+    this.path = Some(path)
+    this
+  }
+
+  /**
+    * Internal method for properties conversion.
+    */
+  override protected def addConnectorProperties(properties: DescriptorProperties): Unit = {
+    path.foreach(properties.putString(CONNECTOR_PATH, _))
+  }
+
+  override private[flink] def needsFormat() = true
+}
+
+/**
+  * Connector descriptor for a file system.
+  */
+object FileSystem {
+
+  /**
+    * Connector descriptor for a file system.
+    */
+  def apply(): FileSystem = new FileSystem()
+  
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystemValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystemValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystemValidator.scala
new file mode 100644
index 0000000..b065b5f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystemValidator.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE
+import org.apache.flink.table.descriptors.FileSystemValidator.{CONNECTOR_PATH, CONNECTOR_TYPE_VALUE}
+
+/**
+  * Validator for [[FileSystem]].
+  */
+class FileSystemValidator extends ConnectorDescriptorValidator {
+
+  override def validate(properties: DescriptorProperties): Unit = {
+    super.validate(properties)
+    properties.validateValue(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE, isOptional = false)
+    properties.validateString(CONNECTOR_PATH, isOptional = false, minLen = 1)
+  }
+}
+
+object FileSystemValidator {
+
+  val CONNECTOR_TYPE_VALUE = "filesystem"
+  val CONNECTOR_PATH = "connector.path"
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptor.scala
new file mode 100644
index 0000000..86f6229
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptor.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_TYPE, FORMAT_VERSION}
+
+/**
+  * Describes the format of data.
+  *
+  * @param tpe string identifier for the format
+  */
+abstract class FormatDescriptor(
+    private val tpe: String,
+    private val version: Int)
+  extends Descriptor {
+
+  override def toString: String = this.getClass.getSimpleName
+
+  /**
+    * Internal method for properties conversion.
+    */
+  final private[flink] def addProperties(properties: DescriptorProperties): Unit = {
+    properties.putString(FORMAT_TYPE, tpe)
+    properties.putInt(FORMAT_VERSION, version)
+    addFormatProperties(properties)
+  }
+
+  /**
+    * Internal method for format properties conversion.
+    */
+  protected def addFormatProperties(properties: DescriptorProperties): Unit
+
+}