You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/01/31 11:47:32 UTC
[1/3] flink git commit: [FLINK-8240] [table] Create unified
interfaces to configure and instatiate TableSources
Repository: flink
Updated Branches:
refs/heads/master a5476cdcd -> 2cb58960e
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
index 42f0769..050f1a1 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
@@ -380,6 +380,44 @@ class TableSourceTest extends TableTestBase {
Assert.assertEquals(source1, source2)
}
+// TODO enable this test once we expose the feature through the table environment
+// @Test
+// def testCsvTableSourceDescriptor(): Unit = {
+// val util = streamTestUtil()
+// val source1 = util.tableEnv
+// .from(
+// FileSystem()
+// .path("/path/to/csv"))
+// .withFormat(
+// Csv()
+// .field("myfield", Types.STRING)
+// .field("myfield2", Types.INT)
+// .quoteCharacter(';')
+// .fieldDelimiter("#")
+// .lineDelimiter("\r\n")
+// .commentPrefix("%%")
+// .ignoreFirstLine()
+// .ignoreParseErrors())
+// .withSchema(
+// Schema()
+// .field("myfield", Types.STRING)
+// .field("myfield2", Types.INT))
+// .toTableSource
+//
+// val source2 = new CsvTableSource(
+// "/path/to/csv",
+// Array("myfield", "myfield2"),
+// Array(Types.STRING, Types.INT),
+// "#",
+// "\r\n",
+// ';',
+// true,
+// "%%",
+// true)
+//
+// Assert.assertEquals(source1, source2)
+// }
+
@Test
def testTimeLiteralExpressionPushdown(): Unit = {
val (tableSource, tableName) = filterableTableSourceTimeTypes
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
index f9920e7..e6d2458 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
@@ -31,6 +31,7 @@ import org.apache.flink.table.calcite.{CalciteConfig, FlinkTypeFactory, FlinkTyp
import org.apache.flink.table.plan.schema.TableSourceTable
import org.apache.flink.table.runtime.utils.CommonTestData
import org.apache.flink.table.sources.CsvTableSource
+import org.apache.flink.table.utils.MockTableEnvironment
import org.junit.Assert._
import org.junit.{Before, Test}
@@ -48,7 +49,8 @@ class ExternalCatalogSchemaTest {
def setUp(): Unit = {
val rootSchemaPlus: SchemaPlus = CalciteSchema.createRootSchema(true, false).plus()
val catalog = CommonTestData.getInMemoryTestCatalog
- ExternalCatalogSchema.registerCatalog(rootSchemaPlus, schemaName, catalog)
+ ExternalCatalogSchema.registerCatalog(
+ new MockTableEnvironment, rootSchemaPlus, schemaName, catalog)
externalCatalogSchema = rootSchemaPlus.getSubSchema("schemaName")
val typeFactory = new FlinkTypeFactory(new FlinkTypeSystem())
val prop = new Properties()
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala
index 0744de9..cdacce1 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala
@@ -24,8 +24,9 @@ import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.api.{TableSchema, Types}
-import org.apache.flink.table.plan.schema.{StreamTableSourceTable}
+import org.apache.flink.table.plan.schema.StreamTableSourceTable
import org.apache.flink.table.sources.StreamTableSource
+import org.apache.flink.table.utils.MockTableEnvironment
import org.apache.flink.types.Row
import org.junit.Assert.assertTrue
import org.junit.{Before, Test}
@@ -41,7 +42,8 @@ class ExternalTableSourceUtilTest {
def testExternalStreamTable() = {
val schema = new TableSchema(Array("foo"), Array(BasicTypeInfo.INT_TYPE_INFO))
val table = ExternalCatalogTable("mock", schema)
- val tableSource = ExternalTableSourceUtil.fromExternalCatalogTable(table)
+ val tableSource = ExternalTableSourceUtil.fromExternalCatalogTable(
+ new MockTableEnvironment, table)
assertTrue(tableSource.isInstanceOf[StreamTableSourceTable[_]])
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala
new file mode 100644
index 0000000..15cf13b
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.table.api.{TableSchema, Types, ValidationException}
+import org.junit.Test
+
+class CsvTest extends DescriptorTestBase {
+
+ @Test
+ def testCsv(): Unit = {
+ val desc = Csv()
+ .field("field1", "STRING")
+ .field("field2", Types.SQL_TIMESTAMP)
+ .field("field3", TypeExtractor.createTypeInfo(classOf[Class[_]]))
+ .field("field4", Types.ROW(
+ Array[String]("test", "row"),
+ Array[TypeInformation[_]](Types.INT, Types.STRING)))
+ .lineDelimiter("^")
+ val expected = Seq(
+ "format.type" -> "csv",
+ "format.version" -> "1",
+ "format.fields.0.name" -> "field1",
+ "format.fields.0.type" -> "STRING",
+ "format.fields.1.name" -> "field2",
+ "format.fields.1.type" -> "TIMESTAMP",
+ "format.fields.2.name" -> "field3",
+ "format.fields.2.type" -> "ANY(java.lang.Class)",
+ "format.fields.3.name" -> "field4",
+ "format.fields.3.type" -> "ROW(test INT, row VARCHAR)",
+ "format.line-delimiter" -> "^")
+ verifyProperties(desc, expected)
+ }
+
+ @Test
+ def testCsvTableSchema(): Unit = {
+ val desc = Csv()
+ .schema(new TableSchema(
+ Array[String]("test", "row"),
+ Array[TypeInformation[_]](Types.INT, Types.STRING)))
+ .quoteCharacter('#')
+ .ignoreFirstLine()
+ val expected = Seq(
+ "format.type" -> "csv",
+ "format.version" -> "1",
+ "format.fields.0.name" -> "test",
+ "format.fields.0.type" -> "INT",
+ "format.fields.1.name" -> "row",
+ "format.fields.1.type" -> "VARCHAR",
+ "format.quote-character" -> "#",
+ "format.ignore-first-line" -> "true")
+ verifyProperties(desc, expected)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidType(): Unit = {
+ verifyInvalidProperty("format.fields.0.type", "WHATEVER")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidField(): Unit = {
+ verifyInvalidProperty("format.fields.10.name", "WHATEVER")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidQuoteCharacter(): Unit = {
+ verifyInvalidProperty("format.quote-character", "qq")
+ }
+
+ override def descriptor(): Descriptor = {
+ Csv()
+ .field("field1", "STRING")
+ .field("field2", Types.SQL_TIMESTAMP)
+ .field("field3", TypeExtractor.createTypeInfo(classOf[Class[_]]))
+ .field("field4", Types.ROW(
+ Array[String]("test", "row"),
+ Array[TypeInformation[_]](Types.INT, Types.STRING)))
+ .lineDelimiter("^")
+ }
+
+ override def validator(): DescriptorValidator = {
+ new CsvValidator()
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala
new file mode 100644
index 0000000..3a59c9b
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.junit.Assert.assertEquals
+
+abstract class DescriptorTestBase {
+
+ /**
+ * Returns a valid descriptor.
+ */
+ def descriptor(): Descriptor
+
+ /**
+ * Returns a validator that can validate this descriptor.
+ */
+ def validator(): DescriptorValidator
+
+ def verifyProperties(descriptor: Descriptor, expected: Seq[(String, String)]): Unit = {
+ val normProps = new DescriptorProperties
+ descriptor.addProperties(normProps)
+ assertEquals(expected.toMap, normProps.asMap)
+ }
+
+ def verifyInvalidProperty(property: String, invalidValue: String): Unit = {
+ val properties = new DescriptorProperties
+ descriptor().addProperties(properties)
+ properties.unsafePut(property, invalidValue)
+ validator().validate(properties)
+ }
+
+ def verifyMissingProperty(removeProperty: String): Unit = {
+ val properties = new DescriptorProperties
+ descriptor().addProperties(properties)
+ properties.unsafeRemove(removeProperty)
+ validator().validate(properties)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/FileSystemTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/FileSystemTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/FileSystemTest.scala
new file mode 100644
index 0000000..3452e8d
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/FileSystemTest.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.ValidationException
+import org.junit.Test
+
+class FileSystemTest extends DescriptorTestBase {
+
+ @Test
+ def testFileSystem(): Unit = {
+ val desc = FileSystem().path("/myfile")
+ val expected = Seq(
+ "connector.type" -> "filesystem",
+ "connector.version" -> "1",
+ "connector.path" -> "/myfile")
+ verifyProperties(desc, expected)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidPath(): Unit = {
+ verifyInvalidProperty("connector.path", "")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testMissingPath(): Unit = {
+ verifyMissingProperty("connector.path")
+ }
+
+ override def descriptor(): Descriptor = {
+ FileSystem().path("/myfile")
+ }
+
+ override def validator(): DescriptorValidator = {
+ new FileSystemValidator()
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/JsonTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/JsonTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/JsonTest.scala
new file mode 100644
index 0000000..756ca23
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/JsonTest.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.ValidationException
+import org.junit.Test
+
+class JsonTest extends DescriptorTestBase {
+
+ @Test
+ def testJson(): Unit = {
+ val schema =
+ """
+ |{
+ | "title": "Person",
+ | "type": "object",
+ | "properties": {
+ | "firstName": {
+ | "type": "string"
+ | },
+ | "lastName": {
+ | "type": "string"
+ | },
+ | "age": {
+ | "description": "Age in years",
+ | "type": "integer",
+ | "minimum": 0
+ | }
+ | },
+ | "required": ["firstName", "lastName"]
+ |}
+ |""".stripMargin
+ val desc = Json()
+ .schema(schema)
+ .failOnMissingField(true)
+ val expected = Seq(
+ "format.type" -> "json",
+ "format.version" -> "1",
+ "format.schema-string" -> schema,
+ "format.fail-on-missing-field" -> "true")
+ verifyProperties(desc, expected)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidMissingField(): Unit = {
+ verifyInvalidProperty("format.fail-on-missing-field", "DDD")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testMissingSchema(): Unit = {
+ verifyMissingProperty("format.schema-string")
+ }
+
+ override def descriptor(): Descriptor = {
+ Json().schema("test")
+ }
+
+ override def validator(): DescriptorValidator = {
+ new JsonValidator()
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/MetadataTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/MetadataTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/MetadataTest.scala
new file mode 100644
index 0000000..a1854ce
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/MetadataTest.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.ValidationException
+import org.junit.Test
+
+class MetadataTest extends DescriptorTestBase {
+
+ @Test
+ def testMetadata(): Unit = {
+ val desc = Metadata()
+ .comment("Some additional comment")
+ .creationTime(123L)
+ .lastAccessTime(12020202L)
+ val expected = Seq(
+ "metadata.comment" -> "Some additional comment",
+ "metadata.creation-time" -> "123",
+ "metadata.last-access-time" -> "12020202"
+ )
+ verifyProperties(desc, expected)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidCreationTime(): Unit = {
+ verifyInvalidProperty("metadata.creation-time", "dfghj")
+ }
+
+ override def descriptor(): Descriptor = {
+ Metadata()
+ .comment("Some additional comment")
+ .creationTime(123L)
+ .lastAccessTime(12020202L)
+ }
+
+ override def validator(): DescriptorValidator = {
+ new MetadataValidator()
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala
new file mode 100644
index 0000000..80050fc
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.descriptors.RowtimeTest.CustomAssigner
+import org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner
+import org.apache.flink.types.Row
+import org.junit.Test
+
+class RowtimeTest extends DescriptorTestBase {
+
+ @Test
+ def testRowtime(): Unit = {
+ val desc = Rowtime()
+ .timestampsFromField("otherField")
+ .watermarksPeriodicBounding(1000L)
+ val expected = Seq(
+ "rowtime.0.version" -> "1",
+ "rowtime.0.timestamps.type" -> "from-field",
+ "rowtime.0.timestamps.from" -> "otherField",
+ "rowtime.0.watermarks.type" -> "periodic-bounding",
+ "rowtime.0.watermarks.delay" -> "1000"
+ )
+ verifyProperties(desc, expected)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidWatermarkType(): Unit = {
+ verifyInvalidProperty("rowtime.0.watermarks.type", "xxx")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testMissingWatermarkClass(): Unit = {
+ verifyMissingProperty("rowtime.0.watermarks.class")
+ }
+
+ override def descriptor(): Descriptor = {
+ Rowtime()
+ .timestampsFromSource()
+ .watermarksFromStrategy(new CustomAssigner())
+ }
+
+ override def validator(): DescriptorValidator = {
+ new RowtimeValidator("rowtime.0.")
+ }
+}
+
+object RowtimeTest {
+
+ class CustomAssigner extends PunctuatedWatermarkAssigner() {
+ override def getWatermark(row: Row, timestamp: Long): Watermark =
+ throw new UnsupportedOperationException()
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaTest.scala
new file mode 100644
index 0000000..f663a96
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaTest.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{Types, ValidationException}
+import org.junit.Test
+
+class SchemaTest extends DescriptorTestBase {
+
+ @Test
+ def testSchema(): Unit = {
+ val desc = Schema()
+ .field("myField", Types.BOOLEAN)
+ .field("otherField", "VARCHAR").from("csvField")
+ .field("p", Types.SQL_TIMESTAMP).proctime()
+ .field("r", Types.SQL_TIMESTAMP).rowtime(
+ Rowtime().timestampsFromSource().watermarksFromSource())
+ val expected = Seq(
+ "schema.version" -> "1",
+ "schema.0.name" -> "myField",
+ "schema.0.type" -> "BOOLEAN",
+ "schema.1.name" -> "otherField",
+ "schema.1.type" -> "VARCHAR",
+ "schema.1.from" -> "csvField",
+ "schema.2.name" -> "p",
+ "schema.2.type" -> "TIMESTAMP",
+ "schema.2.proctime" -> "true",
+ "schema.3.name" -> "r",
+ "schema.3.type" -> "TIMESTAMP",
+ "schema.3.rowtime.0.version" -> "1",
+ "schema.3.rowtime.0.watermarks.type" -> "from-source",
+ "schema.3.rowtime.0.timestamps.type" -> "from-source"
+ )
+ verifyProperties(desc, expected)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidType(): Unit = {
+ verifyInvalidProperty("schema.1.type", "dfghj")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testBothRowtimeAndProctime(): Unit = {
+ verifyInvalidProperty("schema.2.rowtime.0.version", "1")
+ verifyInvalidProperty("schema.2.rowtime.0.watermarks.type", "from-source")
+ verifyInvalidProperty("schema.2.rowtime.0.timestamps.type", "from-source")
+ }
+
+ override def descriptor(): Descriptor = {
+ Schema()
+ .field("myField", Types.BOOLEAN)
+ .field("otherField", "VARCHAR").from("csvField")
+ .field("p", Types.SQL_TIMESTAMP).proctime()
+ .field("r", Types.SQL_TIMESTAMP)
+ }
+
+ override def validator(): DescriptorValidator = {
+ new SchemaValidator(isStreamEnvironment = true)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/StatisticsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/StatisticsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/StatisticsTest.scala
new file mode 100644
index 0000000..3b248b4
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/StatisticsTest.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import _root_.java.util
+
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.plan.stats.{ColumnStats, TableStats}
+import org.junit.Test
+
+class StatisticsTest extends DescriptorTestBase {
+
+ @Test
+ def testStatistics(): Unit = {
+ val desc = Statistics()
+ .rowCount(1000L)
+ .columnStats("a", ColumnStats(1L, 2L, 3.0, 4, 5, 6))
+ .columnAvgLength("b", 42.0)
+ .columnNullCount("a", 300)
+ val expected = Seq(
+ "statistics.version" -> "1",
+ "statistics.row-count" -> "1000",
+ "statistics.columns.0.name" -> "a",
+ "statistics.columns.0.distinct-count" -> "1",
+ "statistics.columns.0.null-count" -> "300",
+ "statistics.columns.0.avg-length" -> "3.0",
+ "statistics.columns.0.max-length" -> "4",
+ "statistics.columns.0.max-value" -> "5",
+ "statistics.columns.0.min-value" -> "6",
+ "statistics.columns.1.name" -> "b",
+ "statistics.columns.1.avg-length" -> "42.0"
+ )
+ verifyProperties(desc, expected)
+ }
+
+ @Test
+ def testStatisticsTableStats(): Unit = {
+ val map = new util.HashMap[String, ColumnStats]()
+ map.put("a", ColumnStats(null, 2L, 3.0, null, 5, 6))
+ val desc = Statistics()
+ .tableStats(TableStats(32L, map))
+ val expected = Seq(
+ "statistics.version" -> "1",
+ "statistics.row-count" -> "32",
+ "statistics.columns.0.name" -> "a",
+ "statistics.columns.0.null-count" -> "2",
+ "statistics.columns.0.avg-length" -> "3.0",
+ "statistics.columns.0.max-value" -> "5",
+ "statistics.columns.0.min-value" -> "6"
+ )
+ verifyProperties(desc, expected)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidRowCount(): Unit = {
+ verifyInvalidProperty("statistics.row-count", "abx")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testMissingName(): Unit = {
+ verifyMissingProperty("statistics.columns.0.name")
+ }
+
+ override def descriptor(): Descriptor = {
+ Statistics()
+ .rowCount(1000L)
+ .columnStats("a", ColumnStats(1L, 2L, 3.0, 4, 5, 6))
+ .columnAvgLength("b", 42.0)
+ .columnNullCount("a", 300)
+ }
+
+ override def validator(): DescriptorValidator = {
+ new StatisticsValidator()
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptorTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptorTest.scala
new file mode 100644
index 0000000..2c9a89c
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptorTest.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.utils.TableTestBase
+
+class StreamTableSourceDescriptorTest extends TableTestBase {
+
+// TODO enable this test once we expose the feature through the table environment
+// @Test
+// def testStreamTableSourceDescriptor(): Unit = {
+// val util = streamTestUtil()
+// val desc = util.tableEnv
+// .from(
+// FileSystem()
+// .path("/path/to/csv"))
+// .withFormat(
+// Csv()
+// .field("myfield", Types.STRING)
+// .field("myfield2", Types.INT)
+// .quoteCharacter(';')
+// .fieldDelimiter("#")
+// .lineDelimiter("\r\n")
+// .commentPrefix("%%")
+// .ignoreFirstLine()
+// .ignoreParseErrors())
+// .withSchema(
+// Schema()
+// .field("myfield", Types.STRING)
+// .field("myfield2", Types.INT)
+// .field("proctime", Types.SQL_TIMESTAMP).proctime()
+// .field("rowtime", Types.SQL_TIMESTAMP).rowtime(
+// Rowtime().timestampsFromSource().watermarksFromSource())
+// )
+// val expected = Seq(
+// "connector.type" -> "filesystem",
+// "connector.path" -> "/path/to/csv",
+// "format.type" -> "csv",
+// "format.fields.0.name" -> "myfield",
+// "format.fields.0.type" -> "VARCHAR",
+// "format.fields.1.name" -> "myfield2",
+// "format.fields.1.type" -> "INT",
+// "format.quote-character" -> ";",
+// "format.field-delimiter" -> "#",
+// "format.line-delimiter" -> "\r\n",
+// "format.comment-prefix" -> "%%",
+// "format.ignore-first-line" -> "true",
+// "format.ignore-parse-errors" -> "true",
+// "schema.0.name" -> "myfield",
+// "schema.0.type" -> "VARCHAR",
+// "schema.1.name" -> "myfield2",
+// "schema.1.type" -> "INT",
+// "schema.2.name" -> "proctime",
+// "schema.2.type" -> "TIMESTAMP",
+// "schema.2.proctime" -> "true",
+// "schema.3.name" -> "rowtime",
+// "schema.3.type" -> "TIMESTAMP",
+// "schema.3.rowtime.0.timestamps.type" -> "from-source",
+// "schema.3.rowtime.0.watermarks.type" -> "from-source"
+// )
+// verifyProperties(desc, expected)
+// }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala
new file mode 100644
index 0000000..5e9b5a2
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import org.apache.flink.table.api.{NoMatchingTableSourceException, TableException, ValidationException}
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_TYPE, CONNECTOR_VERSION}
+import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_TYPE, FORMAT_VERSION}
+import org.junit.Assert.assertTrue
+import org.junit.Test
+
+import scala.collection.mutable
+
+class TableSourceFactoryServiceTest {
+
+ @Test
+ def testValidProperties(): Unit = {
+ val props = properties()
+ assertTrue(TableSourceFactoryService.findTableSourceFactory(props.toMap) != null)
+ }
+
+ @Test(expected = classOf[NoMatchingTableSourceException])
+ def testInvalidContext(): Unit = {
+ val props = properties()
+ props.put(CONNECTOR_TYPE, "FAIL")
+ TableSourceFactoryService.findTableSourceFactory(props.toMap)
+ }
+
+ @Test
+ def testDifferentContextVersion(): Unit = {
+ val props = properties()
+ props.put(CONNECTOR_VERSION, "2")
+ // the table source should still be found
+ assertTrue(TableSourceFactoryService.findTableSourceFactory(props.toMap) != null)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testUnsupportedProperty(): Unit = {
+ val props = properties()
+ props.put("format.path_new", "/new/path")
+ TableSourceFactoryService.findTableSourceFactory(props.toMap)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testFailingFactory(): Unit = {
+ val props = properties()
+ props.put("failing", "true")
+ TableSourceFactoryService.findTableSourceFactory(props.toMap)
+ }
+
+ private def properties(): mutable.Map[String, String] = {
+ val properties = mutable.Map[String, String]()
+ properties.put(CONNECTOR_TYPE, "test")
+ properties.put(FORMAT_TYPE, "test")
+ properties.put(CONNECTOR_VERSION, "1")
+ properties.put(FORMAT_VERSION, "1")
+ properties.put("format.path", "/path/to/target")
+ properties.put("schema.0.name", "a")
+ properties.put("schema.1.name", "b")
+ properties.put("schema.2.name", "c")
+ properties.put("schema.0.field.0.name", "a")
+ properties.put("schema.0.field.1.name", "b")
+ properties.put("schema.0.field.2.name", "c")
+ properties.put("failing", "false")
+ properties
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestTableSourceFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestTableSourceFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestTableSourceFactory.scala
new file mode 100644
index 0000000..ae75f99
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestTableSourceFactory.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableSchema
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_TYPE, CONNECTOR_VERSION}
+import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_TYPE, FORMAT_VERSION}
+import org.apache.flink.types.Row
+
+class TestTableSourceFactory extends TableSourceFactory[Row] {
+
+ override def requiredContext(): util.Map[String, String] = {
+ val context = new util.HashMap[String, String]()
+ context.put(CONNECTOR_TYPE, "test")
+ context.put(FORMAT_TYPE, "test")
+ context.put(CONNECTOR_VERSION, "1")
+ context.put(FORMAT_VERSION, "1")
+ context
+ }
+
+ override def supportedProperties(): util.List[String] = {
+ val properties = new util.ArrayList[String]()
+ // connector
+ properties.add("format.path")
+ properties.add("schema.#.name")
+ properties.add("schema.#.field.#.name")
+ properties.add("failing")
+ properties
+ }
+
+ override def create(properties: util.Map[String, String]): TableSource[Row] = {
+ if (properties.get("failing") == "true") {
+ throw new IllegalArgumentException("Error in this factory.")
+ }
+ new TableSource[Row] {
+ override def getTableSchema: TableSchema = throw new UnsupportedOperationException()
+
+ override def getReturnType: TypeInformation[Row] = throw new UnsupportedOperationException()
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala
new file mode 100644
index 0000000..29d647c
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.runtime.utils.CommonTestData.{NonPojo, Person}
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+/**
+ * Tests for string-based representation of [[TypeInformation]].
+ */
+class TypeStringUtilsTest {
+
+ @Test
+ def testPrimitiveTypes(): Unit = {
+ testReadAndWrite("VARCHAR", Types.STRING)
+ testReadAndWrite("BOOLEAN", Types.BOOLEAN)
+ testReadAndWrite("TINYINT", Types.BYTE)
+ testReadAndWrite("SMALLINT", Types.SHORT)
+ testReadAndWrite("INT", Types.INT)
+ testReadAndWrite("BIGINT", Types.LONG)
+ testReadAndWrite("FLOAT", Types.FLOAT)
+ testReadAndWrite("DOUBLE", Types.DOUBLE)
+ testReadAndWrite("DECIMAL", Types.DECIMAL)
+ testReadAndWrite("DATE", Types.SQL_DATE)
+ testReadAndWrite("TIME", Types.SQL_TIME)
+ testReadAndWrite("TIMESTAMP", Types.SQL_TIMESTAMP)
+
+ // unsupported type information
+ testReadAndWrite(
+ "ANY(java.lang.Void, " +
+ "rO0ABXNyADJvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZWluZm8uQmFzaWNUeXBlSW5mb_oE8IKl" +
+ "ad0GAgAETAAFY2xhenp0ABFMamF2YS9sYW5nL0NsYXNzO0wAD2NvbXBhcmF0b3JDbGFzc3EAfgABWwAXcG9z" +
+ "c2libGVDYXN0VGFyZ2V0VHlwZXN0ABJbTGphdmEvbGFuZy9DbGFzcztMAApzZXJpYWxpemVydAA2TG9yZy9h" +
+ "cGFjaGUvZmxpbmsvYXBpL2NvbW1vbi90eXBldXRpbHMvVHlwZVNlcmlhbGl6ZXI7eHIANG9yZy5hcGFjaGUu" +
+ "ZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb26UjchIurN66wIAAHhwdnIADmphdmEu" +
+ "bGFuZy5Wb2lkAAAAAAAAAAAAAAB4cHB1cgASW0xqYXZhLmxhbmcuQ2xhc3M7qxbXrsvNWpkCAAB4cAAAAABz" +
+ "cgA5b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLlZvaWRTZXJpYWxpemVyAAAA" +
+ "AAAAAAECAAB4cgBCb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLlR5cGVTZXJp" +
+ "YWxpemVyU2luZ2xldG9ueamHqscud0UCAAB4cgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1" +
+ "dGlscy5UeXBlU2VyaWFsaXplcgAAAAAAAAABAgAAeHA)",
+ BasicTypeInfo.VOID_TYPE_INFO)
+ }
+
+ @Test
+ def testWriteComplexTypes(): Unit = {
+ testReadAndWrite(
+ "ROW(f0 DECIMAL, f1 TINYINT)",
+ Types.ROW(Types.DECIMAL, Types.BYTE))
+
+ testReadAndWrite(
+ "ROW(hello DECIMAL, world TINYINT)",
+ Types.ROW(
+ Array[String]("hello", "world"),
+ Array[TypeInformation[_]](Types.DECIMAL, Types.BYTE)))
+
+ testReadAndWrite(
+ "ROW(\"he llo\" DECIMAL, world TINYINT)",
+ Types.ROW(
+ Array[String]("he llo", "world"),
+ Array[TypeInformation[_]](Types.DECIMAL, Types.BYTE)))
+
+ testReadAndWrite(
+ "ROW(\"he \\nllo\" DECIMAL, world TINYINT)",
+ Types.ROW(
+ Array[String]("he \nllo", "world"),
+ Array[TypeInformation[_]](Types.DECIMAL, Types.BYTE)))
+
+ testReadAndWrite(
+ "POJO(org.apache.flink.table.runtime.utils.CommonTestData$Person)",
+ TypeExtractor.createTypeInfo(classOf[Person]))
+
+ testReadAndWrite(
+ "ANY(org.apache.flink.table.runtime.utils.CommonTestData$NonPojo)",
+ TypeExtractor.createTypeInfo(classOf[NonPojo]))
+ }
+
+ private def testReadAndWrite(expected: String, tpe: TypeInformation[_]): Unit = {
+ // test write to string
+ assertEquals(expected, TypeStringUtils.writeTypeInfo(tpe))
+
+ // test read from string
+ assertEquals(tpe, TypeStringUtils.readTypeInfo(expected))
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
index ff7c79d..f43bcc6 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
@@ -35,8 +35,6 @@ class MockTableEnvironment extends TableEnvironment(new TableConfig) {
override def sql(query: String): Table = ???
- override def registerTableSource(name: String, tableSource: TableSource[_]): Unit = ???
-
override protected def getBuiltInNormRuleSet: RuleSet = ???
override protected def getBuiltInPhysicalOptRuleSet: RuleSet = ???
@@ -46,4 +44,9 @@ class MockTableEnvironment extends TableEnvironment(new TableConfig) {
fieldNames: Array[String],
fieldTypes: Array[TypeInformation[_]],
tableSink: TableSink[_]): Unit = ???
+
+ override protected def createUniqueTableName(): String = ???
+
+ override protected def registerTableSourceInternal(name: String, tableSource: TableSource[_])
+ : Unit = ???
}
[2/3] flink git commit: [FLINK-8240] [table] Create unified
interfaces to configure and instatiate TableSources
Posted by tw...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptorValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptorValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptorValidator.scala
new file mode 100644
index 0000000..1aaa399
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptorValidator.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_TYPE, FORMAT_VERSION}
+
+/**
+ * Validator for [[FormatDescriptor]].
+ */
+class FormatDescriptorValidator extends DescriptorValidator {
+
+ override def validate(properties: DescriptorProperties): Unit = {
+ properties.validateString(FORMAT_TYPE, isOptional = false, minLen = 1)
+ properties.validateInt(FORMAT_VERSION, isOptional = true, 0, Integer.MAX_VALUE)
+ }
+}
+
+object FormatDescriptorValidator {
+
+ val FORMAT_TYPE = "format.type"
+ val FORMAT_VERSION = "format.version"
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Json.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Json.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Json.scala
new file mode 100644
index 0000000..cc46d9c
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Json.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.JsonValidator.{FORMAT_FAIL_ON_MISSING_FIELD, FORMAT_SCHEMA_STRING, FORMAT_TYPE_VALUE}
+
+/**
+ * Encoding descriptor for JSON.
+ */
+class Json extends FormatDescriptor(FORMAT_TYPE_VALUE, version = 1) {
+
+ private var failOnMissingField: Option[Boolean] = None
+
+ private var schema: Option[String] = None
+
+ /**
+ * Sets flag whether to fail if a field is missing or not.
+ *
+ * @param failOnMissingField If set to true, the operation fails if there is a missing field.
+ * If set to false, a missing field is set to null.
+ * @return The builder.
+ */
+ def failOnMissingField(failOnMissingField: Boolean): Json = {
+ this.failOnMissingField = Some(failOnMissingField)
+ this
+ }
+
+ /**
+ * Sets the JSON schema string with field names and the types according to the JSON schema
+ * specification [[http://json-schema.org/specification.html]]. Required.
+ *
+ * The schema might be nested.
+ *
+ * @param schema JSON schema
+ */
+ def schema(schema: String): Json = {
+ this.schema = Some(schema)
+ this
+ }
+
+ /**
+ * Internal method for format properties conversion.
+ */
+ override protected def addFormatProperties(properties: DescriptorProperties): Unit = {
+ // we distinguish between "schema string" and "schema" to allow parsing of a
+ // schema object in the future (such that the entire JSON schema can be defined in a YAML
+ // file instead of one large string)
+ schema.foreach(properties.putString(FORMAT_SCHEMA_STRING, _))
+ failOnMissingField.foreach(properties.putBoolean(FORMAT_FAIL_ON_MISSING_FIELD, _))
+ }
+}
+
+/**
+ * Encoding descriptor for JSON.
+ */
+object Json {
+
+ /**
+ * Encoding descriptor for JSON.
+ */
+ def apply(): Json = new Json()
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JsonValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JsonValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JsonValidator.scala
new file mode 100644
index 0000000..9f11caf
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JsonValidator.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.JsonValidator.{FORMAT_FAIL_ON_MISSING_FIELD, FORMAT_SCHEMA_STRING}
+
+/**
+ * Validator for [[Json]].
+ */
+class JsonValidator extends FormatDescriptorValidator {
+
+ override def validate(properties: DescriptorProperties): Unit = {
+ super.validate(properties)
+ properties.validateString(FORMAT_SCHEMA_STRING, isOptional = false, minLen = 1)
+ properties.validateBoolean(FORMAT_FAIL_ON_MISSING_FIELD, isOptional = true)
+ }
+}
+
+object JsonValidator {
+
+ val FORMAT_TYPE_VALUE = "json"
+ val FORMAT_SCHEMA_STRING = "format.schema-string"
+ val FORMAT_FAIL_ON_MISSING_FIELD = "format.fail-on-missing-field"
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Metadata.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Metadata.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Metadata.scala
new file mode 100644
index 0000000..211d786
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Metadata.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.MetadataValidator.{METADATA_COMMENT, METADATA_CREATION_TIME, METADATA_LAST_ACCESS_TIME}
+
+/**
+ * Metadata descriptor for adding additional, useful information.
+ */
+class Metadata extends Descriptor {
+
+ protected var comment: Option[String] = None
+ protected var creationTime: Option[Long] = None
+ protected var lastAccessTime: Option[Long] = None
+
+ /**
+ * Sets a comment.
+ *
+ * @param comment the description
+ */
+ def comment(comment: String): Metadata = {
+ this.comment = Some(comment)
+ this
+ }
+
+ /**
+ * Sets a creation time.
+ *
+ * @param time UTC milliseconds timestamp
+ */
+ def creationTime(time: Long): Metadata = {
+ this.creationTime = Some(time)
+ this
+ }
+
+ /**
+ * Sets a last access time.
+ *
+ * @param time UTC milliseconds timestamp
+ */
+ def lastAccessTime(time: Long): Metadata = {
+ this.lastAccessTime = Some(time)
+ this
+ }
+
+ /**
+ * Internal method for properties conversion.
+ */
+ final override def addProperties(properties: DescriptorProperties): Unit = {
+ comment.foreach(c => properties.putString(METADATA_COMMENT, c))
+ creationTime.foreach(t => properties.putLong(METADATA_CREATION_TIME, t))
+ lastAccessTime.foreach(t => properties.putLong(METADATA_LAST_ACCESS_TIME, t))
+ }
+}
+
+/**
+ * Metadata descriptor for adding additional, useful information.
+ */
+object Metadata {
+
+ /**
+ * Metadata descriptor for adding additional, useful information.
+ */
+ def apply(): Metadata = new Metadata()
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/MetadataValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/MetadataValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/MetadataValidator.scala
new file mode 100644
index 0000000..a8d580c
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/MetadataValidator.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.MetadataValidator.{METADATA_COMMENT, METADATA_CREATION_TIME, METADATA_LAST_ACCESS_TIME, METADATA_VERSION}
+
+/**
+ * Validator for [[Metadata]].
+ */
+class MetadataValidator extends DescriptorValidator {
+
+ override def validate(properties: DescriptorProperties): Unit = {
+ properties.validateInt(METADATA_VERSION, isOptional = true, 0, Integer.MAX_VALUE)
+ properties.validateString(METADATA_COMMENT, isOptional = true)
+ properties.validateLong(METADATA_CREATION_TIME, isOptional = true)
+ properties.validateLong(METADATA_LAST_ACCESS_TIME, isOptional = true)
+ }
+}
+
+object MetadataValidator {
+
+ val METADATA_VERSION = "metadata.version"
+ val METADATA_COMMENT = "metadata.comment"
+ val METADATA_CREATION_TIME = "metadata.creation-time"
+ val METADATA_LAST_ACCESS_TIME = "metadata.last-access-time"
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala
new file mode 100644
index 0000000..a1c80f5
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.descriptors.RowtimeValidator.{ROWTIME, ROWTIME_VERSION, normalizeTimestampExtractor, normalizeWatermarkStrategy}
+import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp, TimestampExtractor}
+import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
+
+import scala.collection.mutable
+
+/**
+ * Rowtime descriptor for describing an event time attribute in the schema.
+ */
+class Rowtime extends Descriptor {
+
+ private var timestampExtractor: Option[TimestampExtractor] = None
+ private var watermarkStrategy: Option[WatermarkStrategy] = None
+
+ /**
+ * Sets a built-in timestamp extractor that converts an existing [[Long]] or
+ * [[Types.SQL_TIMESTAMP]] field into the rowtime attribute.
+ *
+ * @param fieldName The field to convert into a rowtime attribute.
+ */
+ def timestampsFromField(fieldName: String): Rowtime = {
+ timestampExtractor = Some(new ExistingField(fieldName))
+ this
+ }
+
+ /**
+ * Sets a built-in timestamp extractor that converts the assigned timestamps from
+ * a DataStream API record into the rowtime attribute and thus preserves the assigned
+ * timestamps from the source.
+ *
+ * Note: This extractor only works in streaming environments.
+ */
+ def timestampsFromSource(): Rowtime = {
+ timestampExtractor = Some(new StreamRecordTimestamp)
+ this
+ }
+
+ /**
+ * Sets a custom timestamp extractor to be used for the rowtime attribute.
+ *
+ * @param extractor The [[TimestampExtractor]] to extract the rowtime attribute
+ * from the physical type.
+ */
+ def timestampsFromExtractor(extractor: TimestampExtractor): Rowtime = {
+ timestampExtractor = Some(extractor)
+ this
+ }
+
+ /**
+ * Sets a built-in watermark strategy for ascending rowtime attributes.
+ *
+ * Emits a watermark of the maximum observed timestamp so far minus 1.
+ * Rows that have a timestamp equal to the max timestamp are not late.
+ */
+ def watermarksPeriodicAscending(): Rowtime = {
+ watermarkStrategy = Some(new AscendingTimestamps)
+ this
+ }
+
+ /**
+ * Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded
+ * time interval.
+ *
+ * Emits watermarks which are the maximum observed timestamp minus the specified delay.
+ */
+ def watermarksPeriodicBounding(delay: Long): Rowtime = {
+ watermarkStrategy = Some(new BoundedOutOfOrderTimestamps(delay))
+ this
+ }
+
+ /**
+ * Sets a built-in watermark strategy which indicates the watermarks should be preserved from the
+ * underlying DataStream API and thus preserves the assigned timestamps from the source.
+ */
+ def watermarksFromSource(): Rowtime = {
+ watermarkStrategy = Some(PreserveWatermarks.INSTANCE)
+ this
+ }
+
+ /**
+ * Sets a custom watermark strategy to be used for the rowtime attribute.
+ */
+ def watermarksFromStrategy(strategy: WatermarkStrategy): Rowtime = {
+ watermarkStrategy = Some(strategy)
+ this
+ }
+
+ /**
+ * Internal method for properties conversion.
+ */
+ final override def addProperties(properties: DescriptorProperties): Unit = {
+ val props = mutable.HashMap[String, String]()
+ props.put(ROWTIME_VERSION, "1")
+ timestampExtractor.foreach(normalizeTimestampExtractor(_).foreach(e => props.put(e._1, e._2)))
+ watermarkStrategy.foreach(normalizeWatermarkStrategy(_).foreach(e => props.put(e._1, e._2)))
+
+ // use a list for the rowtime to support multiple rowtime attributes in the future
+ properties.putIndexedVariableProperties(ROWTIME, Seq(props.toMap))
+ }
+}
+
+/**
+ * Rowtime descriptor for describing an event time attribute in the schema.
+ */
+object Rowtime {
+
+ /**
+ * Rowtime descriptor for describing an event time attribute in the schema.
+ */
+ def apply(): Rowtime = new Rowtime()
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
new file mode 100644
index 0000000..74e49f1
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.DescriptorProperties.serialize
+import org.apache.flink.table.descriptors.RowtimeValidator._
+import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp, TimestampExtractor}
+import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
+
+/**
+ * Validator for [[Rowtime]].
+ */
+class RowtimeValidator(val prefix: String = "") extends DescriptorValidator {
+
+ override def validate(properties: DescriptorProperties): Unit = {
+ properties.validateInt(prefix + ROWTIME_VERSION, isOptional = true, 0, Integer.MAX_VALUE)
+
+ val noValidation = () => {}
+
+ val timestampExistingField = () => {
+ properties.validateString(prefix + TIMESTAMPS_FROM, isOptional = false, minLen = 1)
+ }
+
+ val timestampCustom = () => {
+ properties.validateString(prefix + TIMESTAMPS_CLASS, isOptional = false, minLen = 1)
+ properties.validateString(prefix + TIMESTAMPS_SERIALIZED, isOptional = false, minLen = 1)
+ }
+
+ properties.validateEnum(
+ prefix + TIMESTAMPS_TYPE,
+ isOptional = false,
+ Map(
+ TIMESTAMPS_TYPE_VALUE_FROM_FIELD -> timestampExistingField,
+ TIMESTAMPS_TYPE_VALUE_FROM_SOURCE -> noValidation,
+ TIMESTAMPS_TYPE_VALUE_CUSTOM -> timestampCustom
+ )
+ )
+
+ val watermarkPeriodicBounding = () => {
+ properties.validateLong(prefix + WATERMARKS_DELAY, isOptional = false, min = 0)
+ }
+
+ val watermarkCustom = () => {
+ properties.validateString(prefix + WATERMARKS_CLASS, isOptional = false, minLen = 1)
+ properties.validateString(prefix + WATERMARKS_SERIALIZED, isOptional = false, minLen = 1)
+ }
+
+ properties.validateEnum(
+ prefix + WATERMARKS_TYPE,
+ isOptional = false,
+ Map(
+ WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING -> noValidation,
+ WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING -> watermarkPeriodicBounding,
+ WATERMARKS_TYPE_VALUE_FROM_SOURCE -> noValidation,
+ WATERMARKS_TYPE_VALUE_CUSTOM -> watermarkCustom
+ )
+ )
+ }
+}
+
+object RowtimeValidator {
+
+ val ROWTIME = "rowtime"
+
+ // per rowtime properties
+
+ val ROWTIME_VERSION = "version"
+ val TIMESTAMPS_TYPE = "timestamps.type"
+ val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
+ val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
+ val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
+ val TIMESTAMPS_FROM = "timestamps.from"
+ val TIMESTAMPS_CLASS = "timestamps.class"
+ val TIMESTAMPS_SERIALIZED = "timestamps.serialized"
+
+ val WATERMARKS_TYPE = "watermarks.type"
+ val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending"
+ val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
+ val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
+ val WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
+ val WATERMARKS_CLASS = "watermarks.class"
+ val WATERMARKS_SERIALIZED = "watermarks.serialized"
+ val WATERMARKS_DELAY = "watermarks.delay"
+
+ // utilities
+
+ def normalizeTimestampExtractor(extractor: TimestampExtractor): Map[String, String] =
+ extractor match {
+ case existing: ExistingField =>
+ Map(
+ TIMESTAMPS_TYPE -> TIMESTAMPS_TYPE_VALUE_FROM_FIELD,
+ TIMESTAMPS_FROM -> existing.getArgumentFields.apply(0))
+ case _: StreamRecordTimestamp =>
+ Map(TIMESTAMPS_TYPE -> TIMESTAMPS_TYPE_VALUE_FROM_SOURCE)
+ case _: TimestampExtractor =>
+ Map(
+ TIMESTAMPS_TYPE -> TIMESTAMPS_TYPE_VALUE_CUSTOM,
+ TIMESTAMPS_CLASS -> extractor.getClass.getName,
+ TIMESTAMPS_SERIALIZED -> serialize(extractor))
+ }
+
+ def normalizeWatermarkStrategy(strategy: WatermarkStrategy): Map[String, String] =
+ strategy match {
+ case _: AscendingTimestamps =>
+ Map(WATERMARKS_TYPE -> WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING)
+ case bounding: BoundedOutOfOrderTimestamps =>
+ Map(
+ WATERMARKS_TYPE -> WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING,
+ WATERMARKS_DELAY -> bounding.delay.toString)
+ case _: PreserveWatermarks =>
+ Map(WATERMARKS_TYPE -> WATERMARKS_TYPE_VALUE_FROM_SOURCE)
+ case _: WatermarkStrategy =>
+ Map(
+ WATERMARKS_TYPE -> WATERMARKS_TYPE_VALUE_CUSTOM,
+ WATERMARKS_CLASS -> strategy.getClass.getName,
+ WATERMARKS_SERIALIZED -> serialize(strategy))
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala
new file mode 100644
index 0000000..2f3a389
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{TableSchema, ValidationException}
+import org.apache.flink.table.descriptors.DescriptorProperties.{normalizeTableSchema, normalizeTypeInfo}
+import org.apache.flink.table.descriptors.SchemaValidator._
+
+import scala.collection.mutable
+
+/**
+ * Describes a schema of a table.
+ *
+ * Note: Field names are matched by the exact name by default (case sensitive).
+ */
+class Schema extends Descriptor {
+
+ // maps a field name to a list of properties that describe type, origin, and the time attribute
+ private val tableSchema = mutable.LinkedHashMap[String, mutable.LinkedHashMap[String, String]]()
+
+ private var lastField: Option[String] = None
+
+ /**
+ * Sets the schema with field names and the types. Required.
+ *
+ * This method overwrites existing fields added with [[field()]].
+ *
+ * @param schema the table schema
+ */
+ def schema(schema: TableSchema): Schema = {
+ tableSchema.clear()
+ lastField = None
+ normalizeTableSchema(schema).foreach {
+ case (n, t) => field(n, t)
+ }
+ this
+ }
+
+ /**
+ * Adds a field with the field name and the type information. Required.
+ * This method can be called multiple times. The call order of this method defines
+ * also the order of the fields in a row.
+ *
+ * @param fieldName the field name
+ * @param fieldType the type information of the field
+ */
+ def field(fieldName: String, fieldType: TypeInformation[_]): Schema = {
+ field(fieldName, normalizeTypeInfo(fieldType))
+ this
+ }
+
+ /**
+ * Adds a field with the field name and the type string. Required.
+ * This method can be called multiple times. The call order of this method defines
+ * also the order of the fields in a row.
+ *
+ * @param fieldName the field name
+ * @param fieldType the type string of the field
+ */
+ def field(fieldName: String, fieldType: String): Schema = {
+ if (tableSchema.contains(fieldName)) {
+ throw new ValidationException(s"Duplicate field name $fieldName.")
+ }
+
+ val fieldProperties = mutable.LinkedHashMap[String, String]()
+ fieldProperties += (TYPE -> fieldType)
+
+ tableSchema += (fieldName -> fieldProperties)
+
+ lastField = Some(fieldName)
+ this
+ }
+
+ /**
+ * Specifies the origin of the previously defined field. The origin field is defined by a
+ * connector or format.
+ *
+ * E.g. field("myString", Types.STRING).from("CSV_MY_STRING")
+ *
+ * Note: Field names are matched by the exact name by default (case sensitive).
+ */
+ def from(originFieldName: String): Schema = {
+ lastField match {
+ case None => throw new ValidationException("No field previously defined. Use field() before.")
+ case Some(f) =>
+ tableSchema(f) += (FROM -> originFieldName)
+ lastField = None
+ }
+ this
+ }
+
+ /**
+ * Specifies the previously defined field as a processing-time attribute.
+ *
+ * E.g. field("proctime", Types.SQL_TIMESTAMP).proctime()
+ */
+ def proctime(): Schema = {
+ lastField match {
+ case None => throw new ValidationException("No field defined previously. Use field() before.")
+ case Some(f) =>
+ tableSchema(f) += (PROCTIME -> PROCTIME_VALUE_TRUE)
+ lastField = None
+ }
+ this
+ }
+
+ /**
+ * Specifies the previously defined field as an event-time attribute.
+ *
+ * E.g. field("rowtime", Types.SQL_TIMESTAMP).rowtime(...)
+ */
+ def rowtime(rowtime: Rowtime): Schema = {
+ lastField match {
+ case None => throw new ValidationException("No field defined previously. Use field() before.")
+ case Some(f) =>
+ val fieldProperties = new DescriptorProperties()
+ rowtime.addProperties(fieldProperties)
+ tableSchema(f) ++= fieldProperties.asMap
+ lastField = None
+ }
+ this
+ }
+
+ /**
+ * Internal method for properties conversion.
+ */
+ final override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
+ properties.putInt(SCHEMA_VERSION, 1)
+ properties.putIndexedVariableProperties(
+ SCHEMA,
+ tableSchema.toSeq.map { case (name, props) =>
+ Map(NAME -> name) ++ props
+ }
+ )
+ }
+}
+
+/**
+ * Describes a schema of a table.
+ */
+object Schema {
+
+ /**
+ * Describes a schema of a table.
+ */
+ def apply(): Schema = new Schema()
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
new file mode 100644
index 0000000..19c0e41
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME
+import org.apache.flink.table.descriptors.SchemaValidator._
+
+/**
+ * Validator for [[Schema]].
+ */
+class SchemaValidator(isStreamEnvironment: Boolean = true) extends DescriptorValidator {
+
+ override def validate(properties: DescriptorProperties): Unit = {
+ properties.validateInt(SCHEMA_VERSION, isOptional = true, 0, Integer.MAX_VALUE)
+
+ val names = properties.getIndexedProperty(SCHEMA, NAME)
+ val types = properties.getIndexedProperty(SCHEMA, TYPE)
+
+ if (names.isEmpty && types.isEmpty) {
+ throw new ValidationException(s"Could not find the required schema for property '$SCHEMA'.")
+ }
+
+ for (i <- 0 until Math.max(names.size, types.size)) {
+ properties.validateString(s"$SCHEMA.$i.$NAME", isOptional = false, minLen = 1)
+ properties.validateType(s"$SCHEMA.$i.$TYPE", isOptional = false)
+ properties.validateString(s"$SCHEMA.$i.$FROM", isOptional = true, minLen = 1)
+ // either proctime or rowtime
+ val proctime = s"$SCHEMA.$i.$PROCTIME"
+ val rowtime = s"$SCHEMA.$i.$ROWTIME"
+ if (properties.contains(proctime)) {
+ if (!isStreamEnvironment) {
+ throw new ValidationException(
+ s"Property '$proctime' is not allowed in a batch environment.")
+ }
+ // check proctime
+ properties.validateBoolean(proctime, isOptional = false)
+ // no rowtime
+ properties.validatePrefixExclusion(rowtime)
+ } else if (properties.hasPrefix(rowtime)) {
+ // check rowtime
+ val rowtimeValidator = new RowtimeValidator(s"$SCHEMA.$i.")
+ rowtimeValidator.validate(properties)
+ // no proctime
+ properties.validateExclusion(proctime)
+ }
+ }
+ }
+}
+
+object SchemaValidator {
+
+ val SCHEMA = "schema"
+ val SCHEMA_VERSION = "schema.version"
+
+ // per column properties
+
+ val NAME = "name"
+ val TYPE = "type"
+ val PROCTIME = "proctime"
+ val PROCTIME_VALUE_TRUE = "true"
+ val FROM = "from"
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Statistics.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Statistics.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Statistics.scala
new file mode 100644
index 0000000..3037286
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Statistics.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.StatisticsValidator._
+import org.apache.flink.table.plan.stats.{ColumnStats, TableStats}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/**
+ * Statistics descriptor for describing table stats.
+ */
+class Statistics extends Descriptor {
+
+ private var rowCount: Option[Long] = None
+ private val columnStats: mutable.LinkedHashMap[String, mutable.Map[String, String]] =
+ mutable.LinkedHashMap[String, mutable.Map[String, String]]()
+
+ /**
+ * Sets the statistics from a [[TableStats]] instance.
+ *
+ * This method overwrites all existing statistics.
+ *
+ * @param tableStats the table statistics
+ */
+ def tableStats(tableStats: TableStats): Statistics = {
+ rowCount(tableStats.rowCount)
+ columnStats.clear()
+ tableStats.colStats.asScala.foreach { case (col, stats) =>
+ columnStats(col, stats)
+ }
+ this
+ }
+
+ /**
+ * Sets statistics for the overall row count. Required.
+ *
+ * @param rowCount the expected number of rows
+ */
+ def rowCount(rowCount: Long): Statistics = {
+ this.rowCount = Some(rowCount)
+ this
+ }
+
+ /**
+ * Sets statistics for a column. Overwrites all existing statistics for this column.
+ *
+ * @param columnName the column name
+ * @param columnStats expected statistics for the column
+ */
+ def columnStats(columnName: String, columnStats: ColumnStats): Statistics = {
+ val map = mutable.Map(normalizeColumnStats(columnStats).toSeq: _*)
+ this.columnStats.put(columnName, map)
+ this
+ }
+
+ /**
+ * Sets the number of distinct values statistic for the given column.
+ */
+ def columnDistinctCount(columnName: String, ndv: Long): Statistics = {
+ this.columnStats
+ .getOrElseUpdate(columnName, mutable.HashMap())
+ .put(DISTINCT_COUNT, ndv.toString)
+ this
+ }
+
+ /**
+ * Sets the number of null values statistic for the given column.
+ */
+ def columnNullCount(columnName: String, nullCount: Long): Statistics = {
+ this.columnStats
+ .getOrElseUpdate(columnName, mutable.HashMap())
+ .put(NULL_COUNT, nullCount.toString)
+ this
+ }
+
+ /**
+ * Sets the average length statistic for the given column.
+ */
+ def columnAvgLength(columnName: String, avgLen: Double): Statistics = {
+ this.columnStats
+ .getOrElseUpdate(columnName, mutable.HashMap())
+ .put(AVG_LENGTH, avgLen.toString)
+ this
+ }
+
+ /**
+ * Sets the maximum length statistic for the given column.
+ */
+ def columnMaxLength(columnName: String, maxLen: Integer): Statistics = {
+ this.columnStats
+ .getOrElseUpdate(columnName, mutable.HashMap())
+ .put(MAX_LENGTH, maxLen.toString)
+ this
+ }
+
+ /**
+ * Sets the maximum value statistic for the given column.
+ */
+ def columnMaxValue(columnName: String, max: Number): Statistics = {
+ this.columnStats
+ .getOrElseUpdate(columnName, mutable.HashMap())
+ .put(MAX_VALUE, max.toString)
+ this
+ }
+
+ /**
+ * Sets the minimum value statistic for the given column.
+ */
+ def columnMinValue(columnName: String, min: Number): Statistics = {
+ this.columnStats
+ .getOrElseUpdate(columnName, mutable.HashMap())
+ .put(MIN_VALUE, min.toString)
+ this
+ }
+
+ /**
+ * Internal method for properties conversion.
+ */
+ final override def addProperties(properties: DescriptorProperties): Unit = {
+ properties.putInt(STATISTICS_VERSION, 1)
+ rowCount.foreach(rc => properties.putLong(STATISTICS_ROW_COUNT, rc))
+ val namedStats = columnStats.map { case (name, stats) =>
+ // name should not be part of the properties key
+ (stats + (NAME -> name)).toMap
+ }.toSeq
+ properties.putIndexedVariableProperties(STATISTICS_COLUMNS, namedStats)
+ }
+}
+
+/**
+ * Statistics descriptor for describing table stats.
+ */
+object Statistics {
+
+ /**
+ * Statistics descriptor for describing table stats.
+ */
+ def apply(): Statistics = new Statistics()
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StatisticsValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StatisticsValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StatisticsValidator.scala
new file mode 100644
index 0000000..a78e422
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StatisticsValidator.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, STATISTICS_VERSION, validateColumnStats}
+import org.apache.flink.table.plan.stats.ColumnStats
+
+import scala.collection.mutable
+
+/**
+ * Validator for [[FormatDescriptor]].
+ */
+class StatisticsValidator extends DescriptorValidator {
+
+ override def validate(properties: DescriptorProperties): Unit = {
+ properties.validateInt(STATISTICS_VERSION, isOptional = true, 0, Integer.MAX_VALUE)
+ properties.validateLong(STATISTICS_ROW_COUNT, isOptional = true, min = 0)
+ validateColumnStats(properties, STATISTICS_COLUMNS)
+ }
+}
+
+object StatisticsValidator {
+
+ val STATISTICS_VERSION = "statistics.version"
+ val STATISTICS_ROW_COUNT = "statistics.row-count"
+ val STATISTICS_COLUMNS = "statistics.columns"
+
+ // per column properties
+
+ val NAME = "name"
+ val DISTINCT_COUNT = "distinct-count"
+ val NULL_COUNT = "null-count"
+ val AVG_LENGTH = "avg-length"
+ val MAX_LENGTH = "max-length"
+ val MAX_VALUE = "max-value"
+ val MIN_VALUE = "min-value"
+
+ // utilities
+
+ def normalizeColumnStats(columnStats: ColumnStats): Map[String, String] = {
+ val stats = mutable.HashMap[String, String]()
+ if (columnStats.ndv != null) {
+ stats += DISTINCT_COUNT -> columnStats.ndv.toString
+ }
+ if (columnStats.nullCount != null) {
+ stats += NULL_COUNT -> columnStats.nullCount.toString
+ }
+ if (columnStats.avgLen != null) {
+ stats += AVG_LENGTH -> columnStats.avgLen.toString
+ }
+ if (columnStats.maxLen != null) {
+ stats += MAX_LENGTH -> columnStats.maxLen.toString
+ }
+ if (columnStats.max != null) {
+ stats += MAX_VALUE -> columnStats.max.toString
+ }
+ if (columnStats.min != null) {
+ stats += MIN_VALUE -> columnStats.min.toString
+ }
+ stats.toMap
+ }
+
+ def validateColumnStats(properties: DescriptorProperties, key: String): Unit = {
+
+ // filter for number of columns
+ val columnCount = properties.getIndexedProperty(key, NAME).size
+
+ for (i <- 0 until columnCount) {
+ properties.validateString(s"$key.$i.$NAME", isOptional = false, minLen = 1)
+ properties.validateLong(s"$key.$i.$DISTINCT_COUNT", isOptional = true, min = 0L)
+ properties.validateLong(s"$key.$i.$NULL_COUNT", isOptional = true, min = 0L)
+ properties.validateDouble(s"$key.$i.$AVG_LENGTH", isOptional = true, min = 0.0)
+ properties.validateInt(s"$key.$i.$MAX_LENGTH", isOptional = true, min = 0)
+ properties.validateDouble(s"$key.$i.$MAX_VALUE", isOptional = true, min = 0.0)
+ properties.validateDouble(s"$key.$i.$MIN_VALUE", isOptional = true, min = 0.0)
+ }
+ }
+
+ def readColumnStats(properties: DescriptorProperties, key: String): Map[String, ColumnStats] = {
+
+ // filter for number of columns
+ val columnCount = properties.getIndexedProperty(key, NAME).size
+
+ val stats = for (i <- 0 until columnCount) yield {
+ val name = properties.getString(s"$key.$i.$NAME").getOrElse(
+ throw new ValidationException(s"Could not find name of property '$key.$i.$NAME'."))
+
+ val stats = ColumnStats(
+ properties.getLong(s"$key.$i.$DISTINCT_COUNT").map(v => Long.box(v)).orNull,
+ properties.getLong(s"$key.$i.$NULL_COUNT").map(v => Long.box(v)).orNull,
+ properties.getDouble(s"$key.$i.$AVG_LENGTH").map(v => Double.box(v)).orNull,
+ properties.getInt(s"$key.$i.$MAX_LENGTH").map(v => Int.box(v)).orNull,
+ properties.getDouble(s"$key.$i.$MAX_VALUE").map(v => Double.box(v)).orNull,
+ properties.getDouble(s"$key.$i.$MIN_VALUE").map(v => Double.box(v)).orNull
+ )
+
+ name -> stats
+ }
+
+ stats.toMap
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala
new file mode 100644
index 0000000..ae88236
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{StreamTableEnvironment, Table, TableException}
+import org.apache.flink.table.sources.{StreamTableSource, TableSource, TableSourceFactoryService}
+
+/**
+ * Descriptor for specifying a table source in a streaming environment.
+ */
+class StreamTableSourceDescriptor(tableEnv: StreamTableEnvironment, connector: ConnectorDescriptor)
+ extends TableSourceDescriptor(connector) {
+
+ /**
+ * Searches for the specified table source, configures it accordingly, and returns it.
+ */
+ def toTableSource: TableSource[_] = {
+ val source = TableSourceFactoryService.findTableSourceFactory(this)
+ source match {
+ case _: StreamTableSource[_] => source
+ case _ => throw new TableException(
+ s"Found table source '${source.getClass.getCanonicalName}' is not applicable " +
+ s"in a streaming environment.")
+ }
+ }
+
+ /**
+ * Searches for the specified table source, configures it accordingly, and returns it as a table.
+ */
+ def toTable: Table = {
+ tableEnv.fromTableSource(toTableSource)
+ }
+
+ /**
+ * Searches for the specified table source, configures it accordingly, and registers it as
+ * a table under the given name.
+ *
+ * @param name table name to be registered in the table environment
+ */
+ def register(name: String): Unit = {
+ tableEnv.registerTableSource(name, toTableSource)
+ }
+
+ /**
+ * Specifies the format that defines how to read data from a connector.
+ */
+ def withFormat(format: FormatDescriptor): StreamTableSourceDescriptor = {
+ formatDescriptor = Some(format)
+ this
+ }
+
+ /**
+ * Specifies the resulting table schema.
+ */
+ def withSchema(schema: Schema): StreamTableSourceDescriptor = {
+ schemaDescriptor = Some(schema)
+ this
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala
new file mode 100644
index 0000000..918b618
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats}
+import org.apache.flink.table.plan.stats.TableStats
+
+import scala.collection.JavaConverters._
+
+/**
+ * Common class for all descriptors describing a table source.
+ */
+abstract class TableSourceDescriptor(connector: ConnectorDescriptor) extends Descriptor {
+
+ protected val connectorDescriptor: ConnectorDescriptor = connector
+
+ protected var formatDescriptor: Option[FormatDescriptor] = None
+ protected var schemaDescriptor: Option[Schema] = None
+ protected var statisticsDescriptor: Option[Statistics] = None
+ protected var metaDescriptor: Option[Metadata] = None
+
+ /**
+ * Internal method for properties conversion.
+ */
+ override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
+ connectorDescriptor.addProperties(properties)
+
+ // check for a format
+ if (connectorDescriptor.needsFormat() && formatDescriptor.isEmpty) {
+ throw new ValidationException(
+ s"The connector '$connectorDescriptor' requires a format description.")
+ } else if (!connectorDescriptor.needsFormat() && formatDescriptor.isDefined) {
+ throw new ValidationException(
+ s"The connector '$connectorDescriptor' does not require a format description " +
+ s"but '${formatDescriptor.get}' found.")
+ }
+
+ formatDescriptor.foreach(_.addProperties(properties))
+ schemaDescriptor.foreach(_.addProperties(properties))
+ metaDescriptor.foreach(_.addProperties(properties))
+ }
+
+ /**
+ * Reads table statistics from the descriptors properties.
+ */
+ protected def getTableStats: Option[TableStats] = {
+ val normalizedProps = new DescriptorProperties()
+ addProperties(normalizedProps)
+ val rowCount = normalizedProps.getLong(STATISTICS_ROW_COUNT).map(v => Long.box(v))
+ rowCount match {
+ case Some(cnt) =>
+ val columnStats = readColumnStats(normalizedProps, STATISTICS_COLUMNS)
+ Some(TableStats(cnt, columnStats.asJava))
+ case None =>
+ None
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala
index d5a5f36..da08916 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala
@@ -40,7 +40,7 @@ class FlinkStatistic(tableStats: Option[TableStats]) extends Statistic {
*
* @return The table statistics
*/
- def getTableStats: TableStats = tableStats.getOrElse(null)
+ def getTableStats: TableStats = tableStats.orNull
/**
* Returns the stats of the specified the column.
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
index ba076b4..e0022c5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
@@ -270,7 +270,7 @@ object CsvTableSource {
/**
* Adds a field with the field name and the type information. Required.
* This method can be called multiple times. The call order of this method defines
- * also the order of thee fields in a row.
+ * also the order of the fields in a row.
*
* @param fieldName the field name
* @param fieldType the type information of the field
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceConverter.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceConverter.scala
index 68d2f55..ab984a6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceConverter.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceConverter.scala
@@ -29,7 +29,11 @@ import com.google.common.collect.ImmutableSet
/**
* The class defines a converter used to convert [[CsvTableSource]] to
* or from [[ExternalCatalogTable]].
+ *
+ * @deprecated Use the more generic [[org.apache.flink.table.sources.TableSourceFactory]] instead.
*/
+@Deprecated
+@deprecated("Use the more generic table source factories instead.")
@TableType(value = "csv")
class CsvTableSourceConverter extends TableSourceConverter[CsvTableSource] {
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala
new file mode 100644
index 0000000..bec4565
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.util
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_TYPE, CONNECTOR_VERSION}
+import org.apache.flink.table.descriptors.CsvValidator._
+import org.apache.flink.table.descriptors.FileSystemValidator.{CONNECTOR_PATH, CONNECTOR_TYPE_VALUE}
+import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_TYPE, FORMAT_VERSION}
+import org.apache.flink.table.descriptors.SchemaValidator.{SCHEMA, SCHEMA_VERSION}
+import org.apache.flink.table.descriptors._
+import org.apache.flink.types.Row
+
+/**
+ * Factory for creating configured instances of [[CsvTableSource]].
+ */
+class CsvTableSourceFactory extends TableSourceFactory[Row] {
+
+ override def requiredContext(): util.Map[String, String] = {
+ val context = new util.HashMap[String, String]()
+ context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE)
+ context.put(FORMAT_TYPE, FORMAT_TYPE_VALUE)
+ context.put(CONNECTOR_VERSION, "1")
+ context.put(FORMAT_VERSION, "1")
+ context.put(SCHEMA_VERSION, "1")
+ context
+ }
+
+ override def supportedProperties(): util.List[String] = {
+ val properties = new util.ArrayList[String]()
+ // connector
+ properties.add(CONNECTOR_PATH)
+ // format
+ properties.add(s"$FORMAT_FIELDS.#.${DescriptorProperties.TYPE}")
+ properties.add(s"$FORMAT_FIELDS.#.${DescriptorProperties.NAME}")
+ properties.add(FORMAT_FIELD_DELIMITER)
+ properties.add(FORMAT_LINE_DELIMITER)
+ properties.add(FORMAT_QUOTE_CHARACTER)
+ properties.add(FORMAT_COMMENT_PREFIX)
+ properties.add(FORMAT_IGNORE_FIRST_LINE)
+ properties.add(FORMAT_IGNORE_PARSE_ERRORS)
+ properties.add(CONNECTOR_PATH)
+ // schema
+ properties.add(s"$SCHEMA.#.${DescriptorProperties.TYPE}")
+ properties.add(s"$SCHEMA.#.${DescriptorProperties.NAME}")
+ properties
+ }
+
+ override def create(properties: util.Map[String, String]): TableSource[Row] = {
+ val params = new DescriptorProperties()
+ params.putProperties(properties)
+
+ // validate
+ new FileSystemValidator().validate(params)
+ new CsvValidator().validate(params)
+ new SchemaValidator().validate(params)
+
+ // build
+ val csvTableSourceBuilder = new CsvTableSource.Builder
+
+ val tableSchema = params.getTableSchema(SCHEMA).get
+ val encodingSchema = params.getTableSchema(FORMAT_FIELDS)
+
+ // the CsvTableSource needs some rework first
+ // for now the schema must be equal to the encoding
+ if (!encodingSchema.contains(tableSchema)) {
+ throw new TableException(
+ "Encodings that differ from the schema are not supported yet for CsvTableSources.")
+ }
+
+ params.getString(CONNECTOR_PATH).foreach(csvTableSourceBuilder.path)
+ params.getString(FORMAT_FIELD_DELIMITER).foreach(csvTableSourceBuilder.fieldDelimiter)
+ params.getString(FORMAT_LINE_DELIMITER).foreach(csvTableSourceBuilder.lineDelimiter)
+
+ encodingSchema.foreach { schema =>
+ schema.getColumnNames.zip(schema.getTypes).foreach { case (name, tpe) =>
+ csvTableSourceBuilder.field(name, tpe)
+ }
+ }
+ params.getCharacter(FORMAT_QUOTE_CHARACTER).foreach(csvTableSourceBuilder.quoteCharacter)
+ params.getString(FORMAT_COMMENT_PREFIX).foreach(csvTableSourceBuilder.commentPrefix)
+ params.getBoolean(FORMAT_IGNORE_FIRST_LINE).foreach { flag =>
+ if (flag) {
+ csvTableSourceBuilder.ignoreFirstLine()
+ }
+ }
+ params.getBoolean(FORMAT_IGNORE_PARSE_ERRORS).foreach { flag =>
+ if (flag) {
+ csvTableSourceBuilder.ignoreParseErrors()
+ }
+ }
+
+ csvTableSourceBuilder.build()
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala
new file mode 100644
index 0000000..f42d765
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.util
+
+/**
+ * A factory to create a [[TableSource]]. This factory is used with Java's Service Provider
+ * Interfaces (SPI) for discovering. A factory is called with a set of normalized properties that
+ * describe the desired table source. The factory allows for matching to the given set of
+ * properties and creating a configured [[TableSource]] accordingly.
+ *
+ * Classes that implement this interface need to be added to the
+ * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' file of a JAR file in
+ * the current classpath to be found.
+ */
+trait TableSourceFactory[T] {
+
+ /**
+ * Specifies the context that this factory has been implemented for. The framework guarantees
+ * to only call the [[create()]] method of the factory if the specified set of properties and
+ * values are met.
+ *
+ * Typical properties might be:
+ * - connector.type
+ * - format.type
+ *
+ * Specified versions allow the framework to provide backwards compatible properties in case of
+ * string format changes:
+ * - connector.version
+ * - format.version
+ *
+ * An empty context means that the factory matches for all requests.
+ */
+ def requiredContext(): util.Map[String, String]
+
+ /**
+ * List of property keys that this factory can handle. This method will be used for validation.
+ * If a property is passed that this factory cannot handle, an exception will be thrown. The
+ * list must not contain the keys that are specified by the context.
+ *
+ * Example properties might be:
+ * - format.line-delimiter
+ * - format.ignore-parse-errors
+ * - format.fields.#.type
+ * - format.fields.#.name
+ *
+ * Note: Use "#" to denote an array of values where "#" represents one or more digits.
+ */
+ def supportedProperties(): util.List[String]
+
+ /**
+ * Creates and configures a [[TableSource]] using the given properties.
+ *
+ * @param properties normalized properties describing a table source
+ * @return the configured table source
+ */
+ def create(properties: util.Map[String, String]): TableSource[T]
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactoryService.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactoryService.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactoryService.scala
new file mode 100644
index 0000000..cb737a9
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactoryService.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.util.{ServiceConfigurationError, ServiceLoader}
+
+import org.apache.flink.table.api.{AmbiguousTableSourceException, NoMatchingTableSourceException, TableException, ValidationException}
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION
+import org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_VERSION
+import org.apache.flink.table.descriptors.MetadataValidator.METADATA_VERSION
+import org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_VERSION
+import org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_VERSION
+import org.apache.flink.table.descriptors.StatisticsValidator.STATISTICS_VERSION
+import org.apache.flink.table.descriptors._
+import org.apache.flink.table.util.Logging
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/**
+ * Service provider interface for finding suitable table source factories for the given properties.
+ */
+object TableSourceFactoryService extends Logging {
+
+ private lazy val loader = ServiceLoader.load(classOf[TableSourceFactory[_]])
+
+ def findTableSourceFactory(descriptor: TableSourceDescriptor): TableSource[_] = {
+ val properties = new DescriptorProperties()
+ descriptor.addProperties(properties)
+ findTableSourceFactory(properties.asMap)
+ }
+
+ def findTableSourceFactory(properties: Map[String, String]): TableSource[_] = {
+ var matchingFactory: Option[(TableSourceFactory[_], Seq[String])] = None
+ try {
+ val iter = loader.iterator()
+ while (iter.hasNext) {
+ val factory = iter.next()
+
+ val requiredContextJava = try {
+ factory.requiredContext()
+ } catch {
+ case t: Throwable =>
+ throw new TableException(
+ s"Table source factory '${factory.getClass.getCanonicalName}' caused an exception.",
+ t)
+ }
+
+ val requiredContext = if (requiredContextJava != null) {
+ // normalize properties
+ requiredContextJava.asScala.map(e => (e._1.toLowerCase, e._2))
+ } else {
+ Map[String, String]()
+ }
+
+ val plainContext = mutable.Map[String, String]()
+ plainContext ++= requiredContext
+ // we remove the versions for now until we have the first backwards compatibility case
+ // with the version we can provide mappings in case the format changes
+ plainContext.remove(CONNECTOR_VERSION)
+ plainContext.remove(FORMAT_VERSION)
+ plainContext.remove(SCHEMA_VERSION)
+ plainContext.remove(ROWTIME_VERSION)
+ plainContext.remove(METADATA_VERSION)
+ plainContext.remove(STATISTICS_VERSION)
+
+ // check if required context is met
+ if (plainContext.forall(e => properties.contains(e._1) && properties(e._1) == e._2)) {
+ matchingFactory match {
+ case Some(_) => throw new AmbiguousTableSourceException(properties)
+ case None => matchingFactory = Some((factory, requiredContext.keys.toSeq))
+ }
+ }
+ }
+ } catch {
+ case e: ServiceConfigurationError =>
+ LOG.error("Could not load service provider for table source factories.", e)
+ throw new TableException("Could not load service provider for table source factories.", e)
+ }
+
+ val (factory, context) = matchingFactory
+ .getOrElse(throw new NoMatchingTableSourceException(properties))
+
+ val plainProperties = mutable.ArrayBuffer[String]()
+ properties.keys.foreach { k =>
+ // replace arrays with wildcard
+ val key = k.replaceAll(".\\d+", ".#")
+ // ignore context properties and duplicates
+ if (!context.contains(key) && !plainProperties.contains(key)) {
+ plainProperties += key
+ }
+ }
+
+ val supportedPropertiesJava = try {
+ factory.supportedProperties()
+ } catch {
+ case t: Throwable =>
+ throw new TableException(
+ s"Table source factory '${factory.getClass.getCanonicalName}' caused an exception.",
+ t)
+ }
+
+ val supportedProperties = if (supportedPropertiesJava != null) {
+ supportedPropertiesJava.asScala.map(_.toLowerCase)
+ } else {
+ Seq[String]()
+ }
+
+ // check for supported properties
+ plainProperties.foreach { k =>
+ if (!supportedProperties.contains(k)) {
+ throw new ValidationException(
+ s"Table factory '${factory.getClass.getCanonicalName}' does not support the " +
+ s"property '$k'. Supported properties are: \n${supportedProperties.mkString("\n")}")
+ }
+ }
+
+ // create the table source
+ try {
+ factory.create(properties.asJava)
+ } catch {
+ case t: Throwable =>
+ throw new TableException(
+ s"Table source factory '${factory.getClass.getCanonicalName}' caused an exception.",
+ t)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/TimestampExtractor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/TimestampExtractor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/TimestampExtractor.scala
index 34c6ba5..ce57b92 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/TimestampExtractor.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/TimestampExtractor.scala
@@ -25,7 +25,7 @@ import org.apache.flink.table.sources.FieldComputer
/**
* Provides the an expression to extract the timestamp for a rowtime attribute.
*/
-abstract class TimestampExtractor extends FieldComputer[Long] {
+abstract class TimestampExtractor extends FieldComputer[Long] with Serializable {
/** Timestamp extractors compute the timestamp as Long. */
override def getReturnType: TypeInformation[Long] = Types.LONG.asInstanceOf[TypeInformation[Long]]
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala
index 3a947ac..9fc7c88 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.scala
@@ -21,12 +21,12 @@ package org.apache.flink.table.sources.wmstrategies
import org.apache.flink.streaming.api.watermark.Watermark
/**
- * A watermark assigner for ascending rowtime attributes.
+ * A watermark strategy for ascending rowtime attributes.
*
* Emits a watermark of the maximum observed timestamp so far minus 1.
* Rows that have a timestamp equal to the max timestamp are not late.
*/
-class AscendingTimestamps extends PeriodicWatermarkAssigner {
+final class AscendingTimestamps extends PeriodicWatermarkAssigner {
var maxTimestamp: Long = Long.MinValue + 1
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.scala
index 957daca..8f7c235 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.scala
@@ -21,13 +21,13 @@ package org.apache.flink.table.sources.wmstrategies
import org.apache.flink.streaming.api.watermark.Watermark
/**
- * A watermark assigner for rowtime attributes which are out-of-order by a bounded time interval.
+ * A watermark strategy for rowtime attributes which are out-of-order by a bounded time interval.
*
* Emits watermarks which are the maximum observed timestamp minus the specified delay.
*
* @param delay The delay by which watermarks are behind the maximum observed timestamp.
*/
-class BoundedOutOfOrderTimestamps(val delay: Long) extends PeriodicWatermarkAssigner {
+final class BoundedOutOfOrderTimestamps(val delay: Long) extends PeriodicWatermarkAssigner {
var maxTimestamp: Long = Long.MinValue + delay
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
index 4c7f4e4..dd71bd3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/watermarkStrategies.scala
@@ -62,7 +62,7 @@ abstract class PunctuatedWatermarkAssigner extends WatermarkStrategy {
}
/** A strategy which indicates the watermarks should be preserved from the underlying datastream.*/
-class PreserveWatermarks extends WatermarkStrategy
+final class PreserveWatermarks extends WatermarkStrategy
object PreserveWatermarks {
val INSTANCE: PreserveWatermarks = new PreserveWatermarks
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala
new file mode 100644
index 0000000..253b491
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils
+
+import java.io.Serializable
+
+import org.apache.commons.codec.binary.Base64
+import org.apache.commons.lang3.StringEscapeUtils
+import org.apache.flink.api.common.functions.InvalidTypesException
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.typeutils._
+import org.apache.flink.table.api.{TableException, Types, ValidationException}
+import org.apache.flink.table.descriptors.DescriptorProperties.normalizeTypeInfo
+import org.apache.flink.util.InstantiationUtil
+
+import _root_.scala.language.implicitConversions
+import _root_.scala.util.parsing.combinator.{JavaTokenParsers, PackratParsers}
+
+/**
+ * Utilities to convert [[org.apache.flink.api.common.typeinfo.TypeInformation]] into a
+ * string representation and back.
+ */
+object TypeStringUtils extends JavaTokenParsers with PackratParsers {
+ case class Keyword(key: String)
+
+ // convert the keyword into an case insensitive Parser
+ implicit def keyword2Parser(kw: Keyword): Parser[String] = {
+ ("""(?i)\Q""" + kw.key + """\E""").r
+ }
+
+ lazy val VARCHAR: Keyword = Keyword("VARCHAR")
+ lazy val STRING: Keyword = Keyword("STRING")
+ lazy val BOOLEAN: Keyword = Keyword("BOOLEAN")
+ lazy val BYTE: Keyword = Keyword("BYTE")
+ lazy val TINYINT: Keyword = Keyword("TINYINT")
+ lazy val SHORT: Keyword = Keyword("SHORT")
+ lazy val SMALLINT: Keyword = Keyword("SMALLINT")
+ lazy val INT: Keyword = Keyword("INT")
+ lazy val LONG: Keyword = Keyword("LONG")
+ lazy val BIGINT: Keyword = Keyword("BIGINT")
+ lazy val FLOAT: Keyword = Keyword("FLOAT")
+ lazy val DOUBLE: Keyword = Keyword("DOUBLE")
+ lazy val DECIMAL: Keyword = Keyword("DECIMAL")
+ lazy val SQL_DATE: Keyword = Keyword("SQL_DATE")
+ lazy val DATE: Keyword = Keyword("DATE")
+ lazy val SQL_TIME: Keyword = Keyword("SQL_TIME")
+ lazy val TIME: Keyword = Keyword("TIME")
+ lazy val SQL_TIMESTAMP: Keyword = Keyword("SQL_TIMESTAMP")
+ lazy val TIMESTAMP: Keyword = Keyword("TIMESTAMP")
+ lazy val ROW: Keyword = Keyword("ROW")
+ lazy val ANY: Keyword = Keyword("ANY")
+ lazy val POJO: Keyword = Keyword("POJO")
+
+ lazy val qualifiedName: Parser[String] =
+ """\p{javaJavaIdentifierStart}[\p{javaJavaIdentifierPart}.]*""".r
+
+ lazy val base64Url: Parser[String] =
+ """[A-Za-z0-9_-]*""".r
+
+ lazy val atomic: PackratParser[TypeInformation[_]] =
+ (VARCHAR | STRING) ^^ { e => Types.STRING } |
+ BOOLEAN ^^ { e => Types.BOOLEAN } |
+ (TINYINT | BYTE) ^^ { e => Types.BYTE } |
+ (SMALLINT | SHORT) ^^ { e => Types.SHORT } |
+ INT ^^ { e => Types.INT } |
+ (BIGINT | LONG) ^^ { e => Types.LONG } |
+ FLOAT ^^ { e => Types.FLOAT } |
+ DOUBLE ^^ { e => Types.DOUBLE } |
+ DECIMAL ^^ { e => Types.DECIMAL } |
+ (DATE | SQL_DATE) ^^ { e => Types.SQL_DATE.asInstanceOf[TypeInformation[_]] } |
+ (TIMESTAMP | SQL_TIMESTAMP) ^^ { e => Types.SQL_TIMESTAMP } |
+ (TIME | SQL_TIME) ^^ { e => Types.SQL_TIME }
+
+ lazy val escapedFieldName: PackratParser[String] = "\"" ~> stringLiteral <~ "\"" ^^ { s =>
+ StringEscapeUtils.unescapeJava(s)
+ }
+
+ lazy val fieldName: PackratParser[String] = escapedFieldName | stringLiteral | ident
+
+ lazy val field: PackratParser[(String, TypeInformation[_])] =
+ fieldName ~ typeInfo ^^ {
+ case name ~ info => (name, info)
+ }
+
+ lazy val namedRow: PackratParser[TypeInformation[_]] =
+ ROW ~ "(" ~> rep1sep(field, ",") <~ ")" ^^ {
+ fields => Types.ROW(fields.map(_._1).toArray, fields.map(_._2).toArray)
+ } | failure("Named row type expected.")
+
+ lazy val unnamedRow: PackratParser[TypeInformation[_]] =
+ ROW ~ "(" ~> rep1sep(typeInfo, ",") <~ ")" ^^ {
+ types => Types.ROW(types: _*)
+ } | failure("Unnamed row type expected.")
+
+ lazy val generic: PackratParser[TypeInformation[_]] =
+ ANY ~ "(" ~> qualifiedName <~ ")" ^^ {
+ typeClass =>
+ val clazz = loadClass(typeClass)
+ new GenericTypeInfo[AnyRef](clazz.asInstanceOf[Class[AnyRef]])
+ }
+
+ lazy val pojo: PackratParser[TypeInformation[_]] = POJO ~ "(" ~> qualifiedName <~ ")" ^^ {
+ typeClass =>
+ val clazz = loadClass(typeClass)
+ val info = TypeExtractor.createTypeInfo(clazz)
+ if (!info.isInstanceOf[PojoTypeInfo[_]]) {
+ throw new ValidationException(s"Class '$typeClass'is not a POJO type.")
+ }
+ info
+ }
+
+ lazy val any: PackratParser[TypeInformation[_]] =
+ ANY ~ "(" ~ qualifiedName ~ "," ~ base64Url ~ ")" ^^ {
+ case _ ~ _ ~ typeClass ~ _ ~ serialized ~ _=>
+ val clazz = loadClass(typeClass)
+ val typeInfo = deserialize(serialized)
+
+ if (clazz != typeInfo.getTypeClass) {
+ throw new ValidationException(
+ s"Class '$typeClass' does no correspond to serialized data.")
+ }
+ typeInfo
+ }
+
+ lazy val typeInfo: PackratParser[TypeInformation[_]] =
+ namedRow | unnamedRow | any | generic | pojo | atomic | failure("Invalid type.")
+
+ def readTypeInfo(typeString: String): TypeInformation[_] = {
+ parseAll(typeInfo, typeString) match {
+ case Success(lst, _) => lst
+
+ case NoSuccess(msg, next) =>
+ throwError(msg, next)
+ }
+ }
+
+ // ----------------------------------------------------------------------------------------------
+
+ def writeTypeInfo(typeInfo: TypeInformation[_]): String = typeInfo match {
+
+ case Types.STRING => VARCHAR.key
+ case Types.BOOLEAN => BOOLEAN.key
+ case Types.BYTE => TINYINT.key
+ case Types.SHORT => SMALLINT.key
+ case Types.INT => INT.key
+ case Types.LONG => BIGINT.key
+ case Types.FLOAT => FLOAT.key
+ case Types.DOUBLE => DOUBLE.key
+ case Types.DECIMAL => DECIMAL.key
+ case Types.SQL_DATE => DATE.key
+ case Types.SQL_TIME => TIME.key
+ case Types.SQL_TIMESTAMP => TIMESTAMP.key
+
+ case rt: RowTypeInfo =>
+ val fields = rt.getFieldNames.zip(rt.getFieldTypes)
+ val normalizedFields = fields.map { f =>
+
+ // escape field name if it contains spaces
+ val name = if (!f._1.matches("\\S+")) {
+ "\"" + StringEscapeUtils.escapeJava(f._1) + "\""
+ } else {
+ f._1
+ }
+
+ s"$name ${normalizeTypeInfo(f._2)}"
+ }
+ s"${ROW.key}(${normalizedFields.mkString(", ")})"
+
+ case generic: GenericTypeInfo[_] =>
+ s"${ANY.key}(${generic.getTypeClass.getName})"
+
+ case pojo: PojoTypeInfo[_] =>
+ // we only support very simple POJOs that only contain extracted fields
+ // (not manually specified)
+ val extractedInfo = try {
+ Some(TypeExtractor.createTypeInfo(pojo.getTypeClass))
+ } catch {
+ case _: InvalidTypesException => None
+ }
+ extractedInfo match {
+ case Some(ei) if ei == pojo => s"${POJO.key}(${pojo.getTypeClass.getName})"
+ case _ =>
+ throw new TableException(
+ "A string representation for custom POJO types is not supported yet.")
+ }
+
+ case _: CompositeType[_] =>
+ throw new TableException("A string representation for composite types is not supported yet.")
+
+ case _: BasicArrayTypeInfo[_, _] | _: ObjectArrayTypeInfo[_, _] |
+ _: PrimitiveArrayTypeInfo[_] =>
+ throw new TableException("A string representation for array types is not supported yet.")
+
+ case _: MapTypeInfo[_, _] | _: MultisetTypeInfo[_] =>
+ throw new TableException("A string representation for map types is not supported yet.")
+
+ case any: TypeInformation[_] =>
+ s"${ANY.key}(${any.getTypeClass.getName}, ${serialize(any)})"
+ }
+
+ // ----------------------------------------------------------------------------------------------
+
+ private def serialize(obj: Serializable): String = {
+ try {
+ val byteArray = InstantiationUtil.serializeObject(obj)
+ Base64.encodeBase64URLSafeString(byteArray)
+ } catch {
+ case e: Exception =>
+ throw new ValidationException(s"Unable to serialize type information '$obj' with " +
+ s"class '${obj.getClass.getName}'.", e)
+ }
+ }
+
+ private def deserialize(data: String): TypeInformation[_] = {
+ val byteData = Base64.decodeBase64(data)
+ InstantiationUtil
+ .deserializeObject[TypeInformation[_]](byteData, Thread.currentThread.getContextClassLoader)
+ }
+
+ private def throwError(msg: String, next: Input): Nothing = {
+ val improvedMsg = msg.replace("string matching regex `\\z'", "End of type information")
+
+ throw new ValidationException(
+ s"""Could not parse type information at column ${next.pos.column}: $improvedMsg
+ |${next.pos.longString}""".stripMargin)
+ }
+
+ private def loadClass(qualifiedName: String): Class[_] = try {
+ Class.forName(qualifiedName, true, Thread.currentThread.getContextClassLoader)
+ } catch {
+ case e: ClassNotFoundException =>
+ throw new ValidationException("Class '" + qualifiedName + "' could not be found. " +
+ "Please note that inner classes must be globally accessible and declared static.", e)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory b/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
new file mode 100644
index 0000000..1b2506e
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.sources.TestTableSourceFactory
[3/3] flink git commit: [FLINK-8240] [table] Create unified
interfaces to configure and instatiate TableSources
Posted by tw...@apache.org.
[FLINK-8240] [table] Create unified interfaces to configure and instatiate TableSources
This closes #5240.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2cb58960
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2cb58960
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2cb58960
Branch: refs/heads/master
Commit: 2cb58960e78bee83d29fa66c2c29647353194f75
Parents: a5476cd
Author: twalthr <tw...@apache.org>
Authored: Fri Dec 15 10:18:20 2017 +0100
Committer: twalthr <tw...@apache.org>
Committed: Wed Jan 31 12:05:43 2018 +0100
----------------------------------------------------------------------
.../connectors/kafka/KafkaJsonTableSource.java | 3 +-
.../flink/api/java/typeutils/PojoTypeInfo.java | 1 -
.../flink/table/annotation/TableType.java | 6 +-
...pache.flink.table.sources.TableSourceFactory | 16 +
.../flink/table/api/BatchTableEnvironment.scala | 24 +-
.../table/api/StreamTableEnvironment.scala | 24 +-
.../flink/table/api/TableEnvironment.scala | 30 +-
.../org/apache/flink/table/api/exceptions.scala | 49 ++
.../table/api/java/BatchTableEnvironment.scala | 2 +-
.../table/catalog/ExternalCatalogSchema.scala | 12 +-
.../table/catalog/ExternalCatalogTable.scala | 290 ++++++++++-
.../table/catalog/ExternalTableSourceUtil.scala | 112 +++--
.../table/catalog/TableSourceConverter.scala | 4 +
.../BatchTableSourceDescriptor.scala | 72 +++
.../table/descriptors/ConnectorDescriptor.scala | 54 ++
.../ConnectorDescriptorValidator.scala | 39 ++
.../apache/flink/table/descriptors/Csv.scala | 166 +++++++
.../flink/table/descriptors/CsvValidator.scala | 53 ++
.../flink/table/descriptors/Descriptor.scala | 32 ++
.../descriptors/DescriptorProperties.scala | 489 +++++++++++++++++++
.../table/descriptors/DescriptorValidator.scala | 32 ++
.../flink/table/descriptors/FileSystem.scala | 60 +++
.../table/descriptors/FileSystemValidator.scala | 41 ++
.../table/descriptors/FormatDescriptor.scala | 49 ++
.../descriptors/FormatDescriptorValidator.scala | 39 ++
.../apache/flink/table/descriptors/Json.scala | 78 +++
.../flink/table/descriptors/JsonValidator.scala | 41 ++
.../flink/table/descriptors/Metadata.scala | 81 +++
.../table/descriptors/MetadataValidator.scala | 43 ++
.../flink/table/descriptors/Rowtime.scala | 133 +++++
.../table/descriptors/RowtimeValidator.scala | 134 +++++
.../apache/flink/table/descriptors/Schema.scala | 164 +++++++
.../table/descriptors/SchemaValidator.scala | 80 +++
.../flink/table/descriptors/Statistics.scala | 157 ++++++
.../table/descriptors/StatisticsValidator.scala | 119 +++++
.../StreamTableSourceDescriptor.scala | 75 +++
.../descriptors/TableSourceDescriptor.scala | 75 +++
.../flink/table/plan/stats/FlinkStatistic.scala | 2 +-
.../flink/table/sources/CsvTableSource.scala | 2 +-
.../table/sources/CsvTableSourceConverter.scala | 4 +
.../table/sources/CsvTableSourceFactory.scala | 113 +++++
.../table/sources/TableSourceFactory.scala | 76 +++
.../sources/TableSourceFactoryService.scala | 144 ++++++
.../tsextractors/TimestampExtractor.scala | 2 +-
.../wmstrategies/AscendingTimestamps.scala | 4 +-
.../BoundedOutOfOrderTimestamps.scala | 4 +-
.../wmstrategies/watermarkStrategies.scala | 2 +-
.../flink/table/typeutils/TypeStringUtils.scala | 252 ++++++++++
...pache.flink.table.sources.TableSourceFactory | 16 +
.../flink/table/api/TableSourceTest.scala | 38 ++
.../catalog/ExternalCatalogSchemaTest.scala | 4 +-
.../catalog/ExternalTableSourceUtilTest.scala | 6 +-
.../flink/table/descriptors/CsvTest.scala | 102 ++++
.../table/descriptors/DescriptorTestBase.scala | 54 ++
.../table/descriptors/FileSystemTest.scala | 53 ++
.../flink/table/descriptors/JsonTest.scala | 77 +++
.../flink/table/descriptors/MetadataTest.scala | 55 +++
.../flink/table/descriptors/RowtimeTest.scala | 72 +++
.../flink/table/descriptors/SchemaTest.scala | 76 +++
.../table/descriptors/StatisticsTest.scala | 91 ++++
.../StreamTableSourceDescriptorTest.scala | 79 +++
.../sources/TableSourceFactoryServiceTest.scala | 82 ++++
.../table/sources/TestTableSourceFactory.scala | 60 +++
.../table/typeutils/TypeStringUtilsTest.scala | 104 ++++
.../table/utils/MockTableEnvironment.scala | 7 +-
65 files changed, 4383 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
index 7de7f34..b3b545e 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
@@ -161,7 +161,8 @@ public abstract class KafkaJsonTableSource extends KafkaTableSource implements D
/**
* Sets flag whether to fail if a field is missing or not.
*
- * @param failOnMissingField If set to true, the TableSource fails if a missing fields.
+ * @param failOnMissingField If set to true, the TableSource fails if there is a missing
+ * field.
* If set to false, a missing field is set to null.
* @return The builder.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
index 211b7ef..7f41f5d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
@@ -268,7 +268,6 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
return new PojoTypeComparatorBuilder();
}
- // used for testing. Maybe use mockito here
@PublicEvolving
public PojoField getPojoFieldAt(int pos) {
if (pos < 0 || pos >= this.fields.length) {
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/TableType.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/TableType.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/TableType.java
index 2d2a7af..c564528 100644
--- a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/TableType.java
+++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/TableType.java
@@ -18,7 +18,6 @@
package org.apache.flink.table.annotation;
-import org.apache.flink.annotation.Public;
import org.apache.flink.table.catalog.TableSourceConverter;
import java.lang.annotation.Documented;
@@ -29,11 +28,14 @@ import java.lang.annotation.Target;
/**
* Annotates a table type of a {@link TableSourceConverter}.
+ *
+ * @deprecated Use the more generic [[org.apache.flink.table.sources.TableSourceFactory]] interface
+ * with Java service loaders instead.
*/
@Documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
-@Public
+@Deprecated
public @interface TableType {
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory b/flink-libraries/flink-table/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
new file mode 100644
index 0000000..ff43eed
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.sources.CsvTableSourceFactory
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index c920d23..05255fd 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -86,17 +86,20 @@ abstract class BatchTableEnvironment(
}
/** Returns a unique table name according to the internal naming pattern. */
- protected def createUniqueTableName(): String = "_DataSetTable_" + nameCntr.getAndIncrement()
+ override protected def createUniqueTableName(): String =
+ "_DataSetTable_" + nameCntr.getAndIncrement()
/**
- * Registers an external [[BatchTableSource]] in this [[TableEnvironment]]'s catalog.
- * Registered tables can be referenced in SQL queries.
+ * Registers an internal [[BatchTableSource]] in this [[TableEnvironment]]'s catalog without
+ * name checking. Registered tables can be referenced in SQL queries.
*
* @param name The name under which the [[TableSource]] is registered.
* @param tableSource The [[TableSource]] to register.
*/
- override def registerTableSource(name: String, tableSource: TableSource[_]): Unit = {
- checkValidTableName(name)
+ override protected def registerTableSourceInternal(
+ name: String,
+ tableSource: TableSource[_])
+ : Unit = {
tableSource match {
case batchTableSource: BatchTableSource[_] =>
@@ -107,6 +110,17 @@ abstract class BatchTableEnvironment(
}
}
+// TODO expose this once we have enough table source factories that can deal with it
+// /**
+// * Creates a table from a descriptor that describes the source connector, source encoding,
+// * the resulting table schema, and other properties.
+// *
+// * @param connectorDescriptor connector descriptor describing the source of the table
+// */
+// def from(connectorDescriptor: ConnectorDescriptor): BatchTableSourceDescriptor = {
+// new BatchTableSourceDescriptor(this, connectorDescriptor)
+// }
+
/**
* Registers an external [[TableSink]] with given field names and types in this
* [[TableEnvironment]]'s catalog.
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 9d94f54..84d7240 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -97,17 +97,20 @@ abstract class StreamTableEnvironment(
}
/** Returns a unique table name according to the internal naming pattern. */
- protected def createUniqueTableName(): String = "_DataStreamTable_" + nameCntr.getAndIncrement()
+ override protected def createUniqueTableName(): String =
+ "_DataStreamTable_" + nameCntr.getAndIncrement()
/**
- * Registers an external [[StreamTableSource]] in this [[TableEnvironment]]'s catalog.
- * Registered tables can be referenced in SQL queries.
+ * Registers an internal [[StreamTableSource]] in this [[TableEnvironment]]'s catalog without
+ * name checking. Registered tables can be referenced in SQL queries.
*
* @param name The name under which the [[TableSource]] is registered.
* @param tableSource The [[TableSource]] to register.
*/
- override def registerTableSource(name: String, tableSource: TableSource[_]): Unit = {
- checkValidTableName(name)
+ override protected def registerTableSourceInternal(
+ name: String,
+ tableSource: TableSource[_])
+ : Unit = {
tableSource match {
case streamTableSource: StreamTableSource[_] =>
@@ -125,6 +128,17 @@ abstract class StreamTableEnvironment(
}
}
+// TODO expose this once we have enough table source factories that can deal with it
+// /**
+// * Creates a table from a descriptor that describes the source connector, source encoding,
+// * the resulting table schema, and other properties.
+// *
+// * @param connectorDescriptor connector descriptor describing the source of the table
+// */
+// def from(connectorDescriptor: ConnectorDescriptor): StreamTableSourceDescriptor = {
+// new StreamTableSourceDescriptor(this, connectorDescriptor)
+// }
+
/**
* Registers an external [[TableSink]] with given field names and types in this
* [[TableEnvironment]]'s catalog.
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 2569cd4..3b314e9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -290,6 +290,17 @@ abstract class TableEnvironment(val config: TableConfig) {
}
/**
+ * Creates a table from a table source.
+ *
+ * @param source table source used as table
+ */
+ def fromTableSource(source: TableSource[_]): Table = {
+ val name = createUniqueTableName()
+ registerTableSourceInternal(name, source)
+ scan(name)
+ }
+
+ /**
* Registers an [[ExternalCatalog]] under a unique name in the TableEnvironment's schema.
* All tables registered in the [[ExternalCatalog]] can be accessed.
*
@@ -302,7 +313,7 @@ abstract class TableEnvironment(val config: TableConfig) {
}
this.externalCatalogs.put(name, externalCatalog)
// create an external catalog Calcite schema, register it on the root schema
- ExternalCatalogSchema.registerCatalog(rootSchema, name, externalCatalog)
+ ExternalCatalogSchema.registerCatalog(this, rootSchema, name, externalCatalog)
}
/**
@@ -422,7 +433,19 @@ abstract class TableEnvironment(val config: TableConfig) {
* @param name The name under which the [[TableSource]] is registered.
* @param tableSource The [[TableSource]] to register.
*/
- def registerTableSource(name: String, tableSource: TableSource[_]): Unit
+ def registerTableSource(name: String, tableSource: TableSource[_]): Unit = {
+ checkValidTableName(name)
+ registerTableSourceInternal(name, tableSource)
+ }
+
+ /**
+ * Registers an internal [[TableSource]] in this [[TableEnvironment]]'s catalog without
+ * name checking. Registered tables can be referenced in SQL queries.
+ *
+ * @param name The name under which the [[TableSource]] is registered.
+ * @param tableSource The [[TableSource]] to register.
+ */
+ protected def registerTableSourceInternal(name: String, tableSource: TableSource[_]): Unit
/**
* Registers an external [[TableSink]] with given field names and types in this
@@ -714,6 +737,9 @@ abstract class TableEnvironment(val config: TableConfig) {
}
}
+ /** Returns a unique table name according to the internal naming pattern. */
+ protected def createUniqueTableName(): String
+
/**
* Checks if the chosen table name is valid.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
index 7ea17fa..f5a713a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
@@ -39,6 +39,9 @@ case class SqlParserException(
/**
* General Exception for all errors during table handling.
+ *
+ * This exception indicates that an internal error occurred or that a feature is not supported
+ * yet. Usually, this exception does not indicate a fault of the user.
*/
case class TableException(
msg: String,
@@ -55,6 +58,8 @@ object TableException {
/**
* Exception for all errors occurring during validation phase.
+ *
+ * This exception indicates that the user did something wrong.
*/
case class ValidationException(
msg: String,
@@ -137,11 +142,51 @@ case class CatalogAlreadyExistException(
}
/**
+ * Exception for not finding a [[org.apache.flink.table.sources.TableSourceFactory]] for the
+ * given properties.
+ *
+ * @param properties properties that describe the table source
+ * @param cause the cause
+ */
+case class NoMatchingTableSourceException(
+ properties: Map[String, String],
+ cause: Throwable)
+ extends RuntimeException(
+ s"Could not find a table source factory in the classpath satisfying the " +
+ s"following properties: \n${properties.map(e => e._1 + "=" + e._2 ).mkString("\n")}",
+ cause) {
+
+ def this(properties: Map[String, String]) = this(properties, null)
+}
+
+/**
+ * Exception for finding more than one [[org.apache.flink.table.sources.TableSourceFactory]] for
+ * the given properties.
+ *
+ * @param properties properties that describe the table source
+ * @param cause the cause
+ */
+case class AmbiguousTableSourceException(
+ properties: Map[String, String],
+ cause: Throwable)
+ extends RuntimeException(
+ s"More than one table source factory in the classpath satisfying the " +
+ s"following properties: \n${properties.map(e => e._1 + "=" + e._2 ).mkString("\n")}",
+ cause) {
+
+ def this(properties: Map[String, String]) = this(properties, null)
+}
+
+/**
* Exception for not finding a [[TableSourceConverter]] for a given table type.
*
* @param tableType table type
* @param cause the cause
+ * @deprecated Use table source factories instead
+ * (see [[org.apache.flink.table.sources.TableSourceFactory]]).
*/
+@Deprecated
+@deprecated("Use table factories (see TableSourceFactory) instead.")
case class NoMatchedTableSourceConverterException(
tableType: String,
cause: Throwable)
@@ -156,7 +201,11 @@ case class NoMatchedTableSourceConverterException(
*
* @param tableType table type
* @param cause the cause
+ * @deprecated Use table source factories instead
+ * (see [[org.apache.flink.table.sources.TableSourceFactory]]).
*/
+@Deprecated
+@deprecated("Use table factories (see TableSourceFactory) instead.")
case class AmbiguousTableSourceConverterException(
tableType: String,
cause: Throwable)
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
index b79d161..f8f35eb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
@@ -20,8 +20,8 @@ package org.apache.flink.table.api.java
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
-import org.apache.flink.table.expressions.ExpressionParser
import org.apache.flink.table.api._
+import org.apache.flink.table.expressions.ExpressionParser
import org.apache.flink.table.functions.{AggregateFunction, TableFunction}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
index d87d665..776ddee 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
@@ -22,7 +22,7 @@ import java.util.{Collection => JCollection, Collections => JCollections, Linked
import org.apache.calcite.linq4j.tree.Expression
import org.apache.calcite.schema._
-import org.apache.flink.table.api.{CatalogNotExistException, TableNotExistException}
+import org.apache.flink.table.api.{CatalogNotExistException, TableEnvironment, TableNotExistException}
import org.apache.flink.table.util.Logging
import scala.collection.JavaConverters._
@@ -33,10 +33,12 @@ import scala.collection.JavaConverters._
* The external catalog and all included sub-catalogs and tables is registered as
* sub-schemas and tables in Calcite.
*
+ * @param tableEnv the environment for this schema
* @param catalogIdentifier external catalog name
* @param catalog external catalog
*/
class ExternalCatalogSchema(
+ tableEnv: TableEnvironment,
catalogIdentifier: String,
catalog: ExternalCatalog) extends Schema with Logging {
@@ -50,7 +52,7 @@ class ExternalCatalogSchema(
override def getSubSchema(name: String): Schema = {
try {
val db = catalog.getSubCatalog(name)
- new ExternalCatalogSchema(name, db)
+ new ExternalCatalogSchema(tableEnv, name, db)
} catch {
case _: CatalogNotExistException =>
LOG.warn(s"Sub-catalog $name does not exist in externalCatalog $catalogIdentifier")
@@ -75,7 +77,7 @@ class ExternalCatalogSchema(
*/
override def getTable(name: String): Table = try {
val externalCatalogTable = catalog.getTable(name)
- ExternalTableSourceUtil.fromExternalCatalogTable(externalCatalogTable)
+ ExternalTableSourceUtil.fromExternalCatalogTable(tableEnv, externalCatalogTable)
} catch {
case TableNotExistException(table, _, _) => {
LOG.warn(s"Table $table does not exist in externalCatalog $catalogIdentifier")
@@ -111,15 +113,17 @@ object ExternalCatalogSchema {
/**
* Registers an external catalog in a Calcite schema.
*
+ * @param tableEnv The environment the catalog will be part of.
* @param parentSchema Parent schema into which the catalog is registered
* @param externalCatalogIdentifier Identifier of the external catalog
* @param externalCatalog The external catalog to register
*/
def registerCatalog(
+ tableEnv: TableEnvironment,
parentSchema: SchemaPlus,
externalCatalogIdentifier: String,
externalCatalog: ExternalCatalog): Unit = {
- val newSchema = new ExternalCatalogSchema(externalCatalogIdentifier, externalCatalog)
+ val newSchema = new ExternalCatalogSchema(tableEnv, externalCatalogIdentifier, externalCatalog)
val schemaPlusOfNewSchema = parentSchema.add(externalCatalogIdentifier, newSchema)
newSchema.registerSubSchemas(schemaPlusOfNewSchema)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
index ae20718..a7c20ac 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
@@ -18,28 +18,296 @@
package org.apache.flink.table.catalog
-import java.util.{HashMap => JHashMap, Map => JMap}
import java.lang.{Long => JLong}
+import java.util.{HashMap => JHashMap, Map => JMap}
-import org.apache.flink.table.api.TableSchema
+import org.apache.flink.table.api.{TableException, TableSchema}
+import org.apache.flink.table.catalog.ExternalCatalogTable._
+import org.apache.flink.table.descriptors.MetadataValidator.{METADATA_COMMENT, METADATA_CREATION_TIME, METADATA_LAST_ACCESS_TIME}
+import org.apache.flink.table.descriptors._
import org.apache.flink.table.plan.stats.TableStats
+import scala.collection.JavaConverters._
+
/**
* Defines a table in an [[ExternalCatalog]].
*
- * @param tableType Table type, e.g csv, hbase, kafka
- * @param schema Schema of the table (column names and types)
- * @param properties Properties of the table
- * @param stats Statistics of the table
- * @param comment Comment of the table
- * @param createTime Create timestamp of the table
- * @param lastAccessTime Timestamp of last access of the table
+ * For backwards compatibility this class supports both the legacy table type (see
+ * [[org.apache.flink.table.annotation.TableType]]) and the factory-based (see
+ * [[org.apache.flink.table.sources.TableSourceFactory]]) approach.
+ *
+ * @param connectorDesc describes the system to connect to
+ * @param formatDesc describes the data format of a connector
+ * @param schemaDesc describes the schema of the result table
+ * @param statisticsDesc describes the estimated statistics of the result table
+ * @param metadataDesc describes additional metadata of a table
*/
-case class ExternalCatalogTable(
+class ExternalCatalogTable(
+ connectorDesc: ConnectorDescriptor,
+ formatDesc: Option[FormatDescriptor],
+ schemaDesc: Option[Schema],
+ statisticsDesc: Option[Statistics],
+ metadataDesc: Option[Metadata])
+ extends TableSourceDescriptor(connectorDesc) {
+
+ this.formatDescriptor = formatDesc
+ this.schemaDescriptor = schemaDesc
+ this.statisticsDescriptor = statisticsDesc
+ this.metaDescriptor = metadataDesc
+
+ // expose statistics for external table source util
+ override def getTableStats: Option[TableStats] = super.getTableStats
+
+ // ----------------------------------------------------------------------------------------------
+ // NOTE: the following code is used for backwards compatibility to the TableType approach
+ // ----------------------------------------------------------------------------------------------
+
+ /**
+ * Returns the legacy table type of an external catalog table.
+ *
+ * @deprecated for backwards compatibility.
+ */
+ @Deprecated
+ @deprecated("For backwards compatibility.")
+ lazy val tableType: String = {
+ val props = new DescriptorProperties()
+ connectorDesc.addProperties(props)
+ props
+ .getString(CONNECTOR_LEGACY_TYPE)
+ .getOrElse(throw new TableException("Could not find a legacy table type to return."))
+ }
+
+ /**
+ * Returns the legacy schema of an external catalog table.
+ *
+ * @deprecated for backwards compatibility.
+ */
+ @Deprecated
+ @deprecated("For backwards compatibility.")
+ lazy val schema: TableSchema = {
+ val props = new DescriptorProperties()
+ connectorDesc.addProperties(props)
+ props
+ .getTableSchema(CONNECTOR_LEGACY_SCHEMA)
+ .getOrElse(throw new TableException("Could not find a legacy schema to return."))
+ }
+
+ /**
+ * Returns the legacy properties of an external catalog table.
+ *
+ * @deprecated for backwards compatibility.
+ */
+ @Deprecated
+ @deprecated("For backwards compatibility.")
+ lazy val properties: JMap[String, String] = {
+ // skip normalization
+ val props = new DescriptorProperties(normalizeKeys = false)
+ val legacyProps = new JHashMap[String, String]()
+ connectorDesc.addProperties(props)
+ props.asMap.flatMap { case (k, v) =>
+ if (k.startsWith(CONNECTOR_LEGACY_PROPERTY)) {
+ // remove "connector.legacy-property-"
+ Some(legacyProps.put(k.substring(CONNECTOR_LEGACY_PROPERTY.length + 1), v))
+ } else {
+ None
+ }
+ }
+ legacyProps
+ }
+
+ /**
+ * Returns the legacy statistics of an external catalog table.
+ *
+ * @deprecated for backwards compatibility.
+ */
+ @Deprecated
+ @deprecated("For backwards compatibility.")
+ lazy val stats: TableStats = getTableStats.orNull
+
+ /**
+ * Returns the legacy comment of an external catalog table.
+ *
+ * @deprecated for backwards compatibility.
+ */
+ @Deprecated
+ @deprecated("For backwards compatibility.")
+ lazy val comment: String = {
+ val normalizedProps = new DescriptorProperties()
+
+ metadataDesc match {
+ case Some(meta) =>
+ meta.addProperties(normalizedProps)
+ normalizedProps.getString(METADATA_COMMENT).orNull
+ case None =>
+ null
+ }
+ }
+
+ /**
+ * Returns the legacy creation time of an external catalog table.
+ *
+ * @deprecated for backwards compatibility.
+ */
+ @Deprecated
+ @deprecated("For backwards compatibility.")
+ lazy val createTime: JLong = {
+ val normalizedProps = new DescriptorProperties()
+
+ metadataDesc match {
+ case Some(meta) =>
+ meta.addProperties(normalizedProps)
+ normalizedProps.getLong(METADATA_CREATION_TIME).map(v => Long.box(v)).orNull
+ case None =>
+ null
+ }
+ }
+
+ /**
+ * Returns the legacy last access time of an external catalog table.
+ *
+ * @deprecated for backwards compatibility.
+ */
+ @Deprecated
+ @deprecated("For backwards compatibility.")
+ lazy val lastAccessTime: JLong = {
+ val normalizedProps = new DescriptorProperties()
+
+ metadataDesc match {
+ case Some(meta) =>
+ meta.addProperties(normalizedProps)
+ normalizedProps.getLong(METADATA_LAST_ACCESS_TIME).map(v => Long.box(v)).orNull
+ case None =>
+ null
+ }
+ }
+
+ /**
+ * Defines a table in an [[ExternalCatalog]].
+ *
+ * @param tableType Table type, e.g csv, hbase, kafka
+ * @param schema Schema of the table (column names and types)
+ * @param properties Properties of the table
+ * @param stats Statistics of the table
+ * @param comment Comment of the table
+ * @param createTime Create timestamp of the table
+ * @param lastAccessTime Timestamp of last access of the table
+ * @deprecated Use a descriptor-based constructor instead.
+ */
+ @Deprecated
+ @deprecated("Use a descriptor-based constructor instead.")
+ def this(
+ tableType: String,
+ schema: TableSchema,
+ properties: JMap[String, String] = new JHashMap(),
+ stats: TableStats = null,
+ comment: String = null,
+ createTime: JLong = System.currentTimeMillis,
+ lastAccessTime: JLong = -1L) = {
+
+ this(
+ toConnectorDescriptor(tableType, schema, properties),
+ None,
+ None,
+ Some(toStatisticsDescriptor(stats)),
+ Some(toMetadataDescriptor(comment, createTime, lastAccessTime)))
+ }
+
+ /**
+ * Returns whether this external catalog table uses the legacy table type.
+ *
+ * @deprecated for backwards compatibility.
+ */
+ @Deprecated
+ @deprecated("For backwards compatibility.")
+ def isLegacyTableType: Boolean = connectorDesc.isInstanceOf[TableTypeConnector]
+}
+
+object ExternalCatalogTable {
+
+ val CONNECTOR_TYPE_VALUE = "legacy-table-type"
+ val CONNECTOR_LEGACY_TYPE = "connector.legacy-type"
+ val CONNECTOR_LEGACY_SCHEMA = "connector.legacy-schema"
+ val CONNECTOR_LEGACY_PROPERTY = "connector.legacy-property"
+
+ /**
+ * Defines a table in an [[ExternalCatalog]].
+ *
+ * @param tableType Table type, e.g csv, hbase, kafka
+ * @param schema Schema of the table (column names and types)
+ * @param properties Properties of the table
+ * @param stats Statistics of the table
+ * @param comment Comment of the table
+ * @param createTime Create timestamp of the table
+ * @param lastAccessTime Timestamp of last access of the table
+ * @deprecated Use a descriptor-based constructor instead.
+ */
+ @Deprecated
+ @deprecated("Use a descriptor-based constructor instead.")
+ def apply(
tableType: String,
schema: TableSchema,
properties: JMap[String, String] = new JHashMap(),
stats: TableStats = null,
comment: String = null,
createTime: JLong = System.currentTimeMillis,
- lastAccessTime: JLong = -1L)
+ lastAccessTime: JLong = -1L): ExternalCatalogTable = {
+
+ new ExternalCatalogTable(
+ tableType,
+ schema,
+ properties,
+ stats,
+ comment,
+ createTime,
+ lastAccessTime)
+ }
+
+ class TableTypeConnector(
+ tableType: String,
+ schema: TableSchema,
+ legacyProperties: JMap[String, String])
+ extends ConnectorDescriptor(CONNECTOR_TYPE_VALUE, version = 1) {
+
+ override protected def addConnectorProperties(properties: DescriptorProperties): Unit = {
+ properties.putString(CONNECTOR_LEGACY_TYPE, tableType)
+ properties.putTableSchema(CONNECTOR_LEGACY_SCHEMA, schema)
+ legacyProperties.asScala.foreach { case (k, v) =>
+ properties.putString(s"$CONNECTOR_LEGACY_PROPERTY-$k", v)
+ }
+ }
+
+ override private[flink] def needsFormat() = false
+ }
+
+ def toConnectorDescriptor(
+ tableType: String,
+ schema: TableSchema,
+ properties: JMap[String, String])
+ : ConnectorDescriptor = {
+
+ new TableTypeConnector(tableType, schema, properties)
+ }
+
+ def toStatisticsDescriptor(stats: TableStats): Statistics = {
+ val statsDesc = Statistics()
+ if (stats != null) {
+ statsDesc.tableStats(stats)
+ }
+ statsDesc
+ }
+
+ def toMetadataDescriptor(
+ comment: String,
+ createTime: JLong,
+ lastAccessTime: JLong)
+ : Metadata = {
+
+ val metadataDesc = Metadata()
+ if (comment != null) {
+ metadataDesc.comment(comment)
+ }
+ metadataDesc
+ .creationTime(createTime)
+ .lastAccessTime(lastAccessTime)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
index d2e297f..3bc5dc0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
@@ -23,22 +23,74 @@ import java.net.URL
import org.apache.commons.configuration.{ConfigurationException, ConversionException, PropertiesConfiguration}
import org.apache.flink.annotation.VisibleForTesting
import org.apache.flink.table.annotation.TableType
-import org.apache.flink.table.api.{AmbiguousTableSourceConverterException, NoMatchedTableSourceConverterException, TableException}
+import org.apache.flink.table.api._
import org.apache.flink.table.plan.schema.{BatchTableSourceTable, StreamTableSourceTable, TableSourceTable}
import org.apache.flink.table.plan.stats.FlinkStatistic
-import org.apache.flink.table.sources.{BatchTableSource, StreamTableSource, TableSource}
+import org.apache.flink.table.sources.{BatchTableSource, StreamTableSource, TableSource, TableSourceFactoryService}
import org.apache.flink.table.util.Logging
import org.apache.flink.util.InstantiationUtil
import org.reflections.Reflections
-import scala.collection.JavaConverters._
-import scala.collection.mutable
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.mutable
/**
* The utility class is used to convert ExternalCatalogTable to TableSourceTable.
*/
object ExternalTableSourceUtil extends Logging {
+ /**
+ * Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance
+ *
+ * @param externalCatalogTable the [[ExternalCatalogTable]] instance which to convert
+ * @return converted [[TableSourceTable]] instance from the input catalog table
+ */
+ def fromExternalCatalogTable(
+ tableEnv: TableEnvironment,
+ externalCatalogTable: ExternalCatalogTable)
+ : TableSourceTable[_] = {
+
+ // check for the legacy external catalog path
+ if (externalCatalogTable.isLegacyTableType) {
+ LOG.warn("External catalog tables based on TableType annotations are deprecated. " +
+ "Please consider updating them to TableSourceFactories.")
+ fromExternalCatalogTableType(externalCatalogTable)
+ }
+ // use the factory approach
+ else {
+ val source = TableSourceFactoryService.findTableSourceFactory(externalCatalogTable)
+ tableEnv match {
+ // check for a batch table source in this batch environment
+ case _: BatchTableEnvironment =>
+ source match {
+ case bts: BatchTableSource[_] =>
+ new BatchTableSourceTable(
+ bts,
+ new FlinkStatistic(externalCatalogTable.getTableStats))
+ case _ => throw new TableException(
+ s"Found table source '${source.getClass.getCanonicalName}' is not applicable " +
+ s"in a batch environment.")
+ }
+ // check for a stream table source in this streaming environment
+ case _: StreamTableEnvironment =>
+ source match {
+ case sts: StreamTableSource[_] =>
+ new StreamTableSourceTable(
+ sts,
+ new FlinkStatistic(externalCatalogTable.getTableStats))
+ case _ => throw new TableException(
+ s"Found table source '${source.getClass.getCanonicalName}' is not applicable " +
+ s"in a streaming environment.")
+ }
+ case _ => throw new TableException("Unsupported table environment.")
+ }
+ }
+ }
+
+ // ----------------------------------------------------------------------------------------------
+ // NOTE: the following lines can be removed once we drop support for TableType
+ // ----------------------------------------------------------------------------------------------
+
// config file to specify scan package to search TableSourceConverter
private val tableSourceConverterConfigFileName = "tableSourceConverter.properties"
@@ -48,7 +100,7 @@ object ExternalTableSourceUtil extends Logging {
val registeredConverters =
new mutable.HashMap[String, mutable.Set[Class[_ <: TableSourceConverter[_]]]]
with mutable.MultiMap[String, Class[_ <: TableSourceConverter[_]]]
- // scan all config files to find TableSourceConverters which are annotationed with TableType.
+ // scan all config files to find TableSourceConverters which are annotated with TableType.
val resourceUrls = getClass.getClassLoader.getResources(tableSourceConverterConfigFileName)
while (resourceUrls.hasMoreElements) {
val url = resourceUrls.nextElement()
@@ -89,12 +141,31 @@ object ExternalTableSourceUtil extends Logging {
}
/**
- * Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance
+ * Parses scan package set from input config file
*
- * @param externalCatalogTable the [[ExternalCatalogTable]] instance which to convert
- * @return converted [[TableSourceTable]] instance from the input catalog table
+ * @param url url of config file
+ * @return scan package set
*/
- def fromExternalCatalogTable(externalCatalogTable: ExternalCatalogTable): TableSourceTable[_] = {
+ private def parseScanPackagesFromConfigFile(url: URL): Set[String] = {
+ try {
+ val config = new PropertiesConfiguration(url)
+ config.setListDelimiter(',')
+ config.getStringArray("scan.packages").filterNot(_.isEmpty).toSet
+ } catch {
+ case e: ConfigurationException =>
+ LOG.warn(s"Error happened while loading the properties file [$url]", e)
+ Set.empty
+ case e1: ConversionException =>
+ LOG.warn(s"Error happened while parsing 'scan.packages' field of properties file [$url]. " +
+ s"The value is not a String or List of Strings.", e1)
+ Set.empty
+ }
+ }
+
+ @VisibleForTesting
+ def fromExternalCatalogTableType(externalCatalogTable: ExternalCatalogTable)
+ : TableSourceTable[_] = {
+
val tableType = externalCatalogTable.tableType
val propertyKeys = externalCatalogTable.properties.keySet()
tableTypeToTableSourceConvertersClazz.get(tableType) match {
@@ -141,27 +212,4 @@ object ExternalTableSourceUtil extends Logging {
throw new NoMatchedTableSourceConverterException(tableType)
}
}
-
- /**
- * Parses scan package set from input config file
- *
- * @param url url of config file
- * @return scan package set
- */
- private def parseScanPackagesFromConfigFile(url: URL): Set[String] = {
- try {
- val config = new PropertiesConfiguration(url)
- config.setListDelimiter(',')
- config.getStringArray("scan.packages").filterNot(_.isEmpty).toSet
- } catch {
- case e: ConfigurationException =>
- LOG.warn(s"Error happened while loading the properties file [$url]", e)
- Set.empty
- case e1: ConversionException =>
- LOG.warn(s"Error happened while parsing 'scan.packages' field of properties file [$url]. " +
- s"The value is not a String or List of Strings.", e1)
- Set.empty
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/TableSourceConverter.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/TableSourceConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/TableSourceConverter.scala
index ca6df9a..e6248b0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/TableSourceConverter.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/TableSourceConverter.scala
@@ -29,7 +29,11 @@ import org.apache.flink.table.sources.TableSource
* table is supported.
*
* @tparam T The [[TableSource]] to be created by this converter.
+ *
+ * @deprecated Use the more generic [[org.apache.flink.table.sources.TableSourceFactory]] instead.
*/
+@Deprecated
+@deprecated("Use the more generic table source factories instead.")
trait TableSourceConverter[T <: TableSource[_]] {
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala
new file mode 100644
index 0000000..3c8366d
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{BatchTableEnvironment, Table, TableException}
+import org.apache.flink.table.sources.{BatchTableSource, TableSource, TableSourceFactoryService}
+
+class BatchTableSourceDescriptor(tableEnv: BatchTableEnvironment, connector: ConnectorDescriptor)
+ extends TableSourceDescriptor(connector) {
+
+ /**
+ * Searches for the specified table source, configures it accordingly, and returns it.
+ */
+ def toTableSource: TableSource[_] = {
+ val source = TableSourceFactoryService.findTableSourceFactory(this)
+ source match {
+ case _: BatchTableSource[_] => source
+ case _ => throw new TableException(
+ s"Found table source '${source.getClass.getCanonicalName}' is not applicable " +
+ s"in a batch environment.")
+ }
+ }
+
+ /**
+ * Searches for the specified table source, configures it accordingly, and returns it as a table.
+ */
+ def toTable: Table = {
+ tableEnv.fromTableSource(toTableSource)
+ }
+
+ /**
+ * Searches for the specified table source, configures it accordingly, and registers it as
+ * a table under the given name.
+ *
+ * @param name table name to be registered in the table environment
+ */
+ def register(name: String): Unit = {
+ tableEnv.registerTableSource(name, toTableSource)
+ }
+
+ /**
+ * Specifies the format that defines how to read data from a connector.
+ */
+ def withFormat(format: FormatDescriptor): BatchTableSourceDescriptor = {
+ formatDescriptor = Some(format)
+ this
+ }
+
+ /**
+ * Specifies the resulting table schema.
+ */
+ def withSchema(schema: Schema): BatchTableSourceDescriptor = {
+ schemaDescriptor = Some(schema)
+ this
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptor.scala
new file mode 100644
index 0000000..f691b4f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptor.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_TYPE, CONNECTOR_VERSION}
+
+/**
+ * Describes a connector to an other system.
+ *
+ * @param tpe string identifier for the connector
+ */
+abstract class ConnectorDescriptor(
+ private val tpe: String,
+ private val version: Int)
+ extends Descriptor {
+
+ override def toString: String = this.getClass.getSimpleName
+
+ /**
+ * Internal method for properties conversion.
+ */
+ final private[flink] def addProperties(properties: DescriptorProperties): Unit = {
+ properties.putString(CONNECTOR_TYPE, tpe)
+ properties.putLong(CONNECTOR_VERSION, version)
+ addConnectorProperties(properties)
+ }
+
+ /**
+ * Internal method for connector properties conversion.
+ */
+ protected def addConnectorProperties(properties: DescriptorProperties): Unit
+
+ /**
+ * Internal method that defines if this connector requires a format descriptor.
+ */
+ private[flink] def needsFormat(): Boolean
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.scala
new file mode 100644
index 0000000..8ab0f45
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_TYPE, CONNECTOR_VERSION}
+
+/**
+ * Validator for [[ConnectorDescriptor]].
+ */
+class ConnectorDescriptorValidator extends DescriptorValidator {
+
+ override def validate(properties: DescriptorProperties): Unit = {
+ properties.validateString(CONNECTOR_TYPE, isOptional = false, minLen = 1)
+ properties.validateInt(CONNECTOR_VERSION, isOptional = true, 0, Integer.MAX_VALUE)
+ }
+}
+
+object ConnectorDescriptorValidator {
+
+ val CONNECTOR_TYPE = "connector.type"
+ val CONNECTOR_VERSION = "connector.version"
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Csv.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Csv.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Csv.scala
new file mode 100644
index 0000000..0493d99
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Csv.scala
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{TableSchema, ValidationException}
+import org.apache.flink.table.descriptors.CsvValidator._
+
+import scala.collection.mutable
+
+/**
+ * Format descriptor for comma-separated values (CSV).
+ */
+class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, version = 1) {
+
+ private var fieldDelim: Option[String] = None
+ private var lineDelim: Option[String] = None
+ private val formatSchema: mutable.LinkedHashMap[String, String] =
+ mutable.LinkedHashMap[String, String]()
+ private var quoteCharacter: Option[Character] = None
+ private var commentPrefix: Option[String] = None
+ private var isIgnoreFirstLine: Option[Boolean] = None
+ private var lenient: Option[Boolean] = None
+
+ /**
+ * Sets the field delimiter, "," by default.
+ *
+ * @param delim the field delimiter
+ */
+ def fieldDelimiter(delim: String): Csv = {
+ this.fieldDelim = Some(delim)
+ this
+ }
+
+ /**
+ * Sets the line delimiter, "\n" by default.
+ *
+ * @param delim the line delimiter
+ */
+ def lineDelimiter(delim: String): Csv = {
+ this.lineDelim = Some(delim)
+ this
+ }
+
+ /**
+ * Sets the format schema with field names and the types. Required.
+ * The table schema must not contain nested fields.
+ *
+ * This method overwrites existing fields added with [[field()]].
+ *
+ * @param schema the table schema
+ */
+ def schema(schema: TableSchema): Csv = {
+ this.formatSchema.clear()
+ DescriptorProperties.normalizeTableSchema(schema).foreach {
+ case (n, t) => field(n, t)
+ }
+ this
+ }
+
+ /**
+ * Adds a format field with the field name and the type information. Required.
+ * This method can be called multiple times. The call order of this method defines
+ * also the order of the fields in the format.
+ *
+ * @param fieldName the field name
+ * @param fieldType the type information of the field
+ */
+ def field(fieldName: String, fieldType: TypeInformation[_]): Csv = {
+ field(fieldName, DescriptorProperties.normalizeTypeInfo(fieldType))
+ this
+ }
+
+ /**
+ * Adds a format field with the field name and the type string. Required.
+ * This method can be called multiple times. The call order of this method defines
+ * also the order of the fields in the format.
+ *
+ * @param fieldName the field name
+ * @param fieldType the type string of the field
+ */
+ def field(fieldName: String, fieldType: String): Csv = {
+ if (formatSchema.contains(fieldName)) {
+ throw new ValidationException(s"Duplicate field name $fieldName.")
+ }
+ formatSchema += (fieldName -> fieldType)
+ this
+ }
+
+ /**
+ * Sets a quote character for String values, null by default.
+ *
+ * @param quote the quote character
+ */
+ def quoteCharacter(quote: Character): Csv = {
+ this.quoteCharacter = Option(quote)
+ this
+ }
+
+ /**
+ * Sets a prefix to indicate comments, null by default.
+ *
+ * @param prefix the prefix to indicate comments
+ */
+ def commentPrefix(prefix: String): Csv = {
+ this.commentPrefix = Option(prefix)
+ this
+ }
+
+ /**
+ * Ignore the first line. Not skip the first line by default.
+ */
+ def ignoreFirstLine(): Csv = {
+ this.isIgnoreFirstLine = Some(true)
+ this
+ }
+
+ /**
+ * Skip records with parse error instead to fail. Throw an exception by default.
+ */
+ def ignoreParseErrors(): Csv = {
+ this.lenient = Some(true)
+ this
+ }
+
+ /**
+ * Internal method for format properties conversion.
+ */
+ override protected def addFormatProperties(properties: DescriptorProperties): Unit = {
+ fieldDelim.foreach(properties.putString(FORMAT_FIELD_DELIMITER, _))
+ lineDelim.foreach(properties.putString(FORMAT_LINE_DELIMITER, _))
+ properties.putTableSchema(FORMAT_FIELDS, formatSchema.toIndexedSeq)
+ quoteCharacter.foreach(properties.putCharacter(FORMAT_QUOTE_CHARACTER, _))
+ commentPrefix.foreach(properties.putString(FORMAT_COMMENT_PREFIX, _))
+ isIgnoreFirstLine.foreach(properties.putBoolean(FORMAT_IGNORE_FIRST_LINE, _))
+ lenient.foreach(properties.putBoolean(FORMAT_IGNORE_PARSE_ERRORS, _))
+ }
+}
+
+/**
+ * Format descriptor for comma-separated values (CSV).
+ */
+object Csv {
+
+ /**
+ * Format descriptor for comma-separated values (CSV).
+ */
+ def apply(): Csv = new Csv()
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/CsvValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/CsvValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/CsvValidator.scala
new file mode 100644
index 0000000..d49314e
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/CsvValidator.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.CsvValidator._
+import org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_TYPE
+
+/**
+ * Validator for [[Csv]].
+ */
+class CsvValidator extends FormatDescriptorValidator {
+
+ override def validate(properties: DescriptorProperties): Unit = {
+ super.validate(properties)
+ properties.validateValue(FORMAT_TYPE, FORMAT_TYPE_VALUE, isOptional = false)
+ properties.validateString(FORMAT_FIELD_DELIMITER, isOptional = true, minLen = 1)
+ properties.validateString(FORMAT_LINE_DELIMITER, isOptional = true, minLen = 1)
+ properties.validateString(FORMAT_QUOTE_CHARACTER, isOptional = true, minLen = 1, maxLen = 1)
+ properties.validateString(FORMAT_COMMENT_PREFIX, isOptional = true, minLen = 1)
+ properties.validateBoolean(FORMAT_IGNORE_FIRST_LINE, isOptional = true)
+ properties.validateBoolean(FORMAT_IGNORE_PARSE_ERRORS, isOptional = true)
+ properties.validateTableSchema(FORMAT_FIELDS, isOptional = false)
+ }
+}
+
+object CsvValidator {
+
+ val FORMAT_TYPE_VALUE = "csv"
+ val FORMAT_FIELD_DELIMITER = "format.field-delimiter"
+ val FORMAT_LINE_DELIMITER = "format.line-delimiter"
+ val FORMAT_QUOTE_CHARACTER = "format.quote-character"
+ val FORMAT_COMMENT_PREFIX = "format.comment-prefix"
+ val FORMAT_IGNORE_FIRST_LINE = "format.ignore-first-line"
+ val FORMAT_IGNORE_PARSE_ERRORS = "format.ignore-parse-errors"
+ val FORMAT_FIELDS = "format.fields"
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala
new file mode 100644
index 0000000..ad97ded
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+ * A class that adds a set of string-based, normalized properties for describing a
+ * table source or table sink.
+ */
+abstract class Descriptor {
+
+ /**
+ * Internal method for properties conversion.
+ */
+ private[flink] def addProperties(properties: DescriptorProperties): Unit
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
new file mode 100644
index 0000000..43d63b3
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import java.io.Serializable
+import java.lang.{Boolean => JBoolean, Double => JDouble, Integer => JInt, Long => JLong}
+import java.util
+import java.util.regex.Pattern
+
+import org.apache.commons.codec.binary.Base64
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{TableSchema, ValidationException}
+import org.apache.flink.table.descriptors.DescriptorProperties.{NAME, TYPE, normalizeTableSchema}
+import org.apache.flink.table.typeutils.TypeStringUtils
+import org.apache.flink.util.InstantiationUtil
+import org.apache.flink.util.Preconditions.checkNotNull
+
+import scala.collection.mutable
+
+import scala.collection.JavaConverters._
+
+/**
+ * Utility class for having a unified string-based representation of Table API related classes
+ * such as [[TableSchema]], [[TypeInformation]], etc.
+ *
+ * @param normalizeKeys flag that indicates if keys should be normalized (this flag is
+ * necessary for backwards compatibility)
+ */
+class DescriptorProperties(normalizeKeys: Boolean = true) {
+
+ private val properties: mutable.Map[String, String] = new mutable.HashMap[String, String]()
+
+ private def put(key: String, value: String): Unit = {
+ if (properties.contains(key)) {
+ throw new IllegalStateException("Property already present.")
+ }
+ if (normalizeKeys) {
+ properties.put(key.toLowerCase, value)
+ } else {
+ properties.put(key, value)
+ }
+ }
+
+ // for testing
+ private[flink] def unsafePut(key: String, value: String): Unit = {
+ properties.put(key, value)
+ }
+
+ // for testing
+ private[flink] def unsafeRemove(key: String): Unit = {
+ properties.remove(key)
+ }
+
+ def putProperties(properties: Map[String, String]): Unit = {
+ properties.foreach { case (k, v) =>
+ put(k, v)
+ }
+ }
+
+ def putProperties(properties: java.util.Map[String, String]): Unit = {
+ properties.asScala.foreach { case (k, v) =>
+ put(k, v)
+ }
+ }
+
+ def putClass(key: String, clazz: Class[_]): Unit = {
+ checkNotNull(key)
+ checkNotNull(clazz)
+ val error = InstantiationUtil.checkForInstantiationError(clazz)
+ if (error != null) {
+ throw new ValidationException(s"Class '${clazz.getName}' is not supported: $error")
+ }
+ put(key, clazz.getName)
+ }
+
+ def putString(key: String, str: String): Unit = {
+ checkNotNull(key)
+ checkNotNull(str)
+ put(key, str)
+ }
+
+ def putBoolean(key: String, b: Boolean): Unit = {
+ checkNotNull(key)
+ put(key, b.toString)
+ }
+
+ def putLong(key: String, l: Long): Unit = {
+ checkNotNull(key)
+ put(key, l.toString)
+ }
+
+ def putInt(key: String, i: Int): Unit = {
+ checkNotNull(key)
+ put(key, i.toString)
+ }
+
+ def putCharacter(key: String, c: Character): Unit = {
+ checkNotNull(key)
+ checkNotNull(c)
+ put(key, c.toString)
+ }
+
+ def putTableSchema(key: String, schema: TableSchema): Unit = {
+ putTableSchema(key, normalizeTableSchema(schema))
+ }
+
+ def putTableSchema(key: String, nameAndType: Seq[(String, String)]): Unit = {
+ putIndexedFixedProperties(
+ key,
+ Seq(NAME, TYPE),
+ nameAndType.map(t => Seq(t._1, t._2))
+ )
+ }
+
+ /**
+ * Adds an indexed sequence of properties (with sub-properties) under a common key.
+ *
+ * For example:
+ *
+ * schema.fields.0.type = INT, schema.fields.0.name = test
+ * schema.fields.1.type = LONG, schema.fields.1.name = test2
+ *
+ * The arity of each propertyValue must match the arity of propertyKeys.
+ */
+ def putIndexedFixedProperties(
+ key: String,
+ propertyKeys: Seq[String],
+ propertyValues: Seq[Seq[String]])
+ : Unit = {
+ checkNotNull(key)
+ checkNotNull(propertyValues)
+ propertyValues.zipWithIndex.foreach { case (values, idx) =>
+ if (values.lengthCompare(propertyKeys.size) != 0) {
+ throw new ValidationException("Values must have same arity as keys.")
+ }
+ values.zipWithIndex.foreach { case (value, keyIdx) =>
+ put(s"$key.$idx.${propertyKeys(keyIdx)}", value)
+ }
+ }
+ }
+
+ /**
+ * Adds an indexed mapping of properties under a common key.
+ *
+ * For example:
+ *
+ * schema.fields.0.type = INT, schema.fields.0.name = test
+ * schema.fields.1.name = test2
+ *
+ * The arity of the propertySets can differ.
+ */
+ def putIndexedVariableProperties(
+ key: String,
+ propertySets: Seq[Map[String, String]])
+ : Unit = {
+ checkNotNull(key)
+ checkNotNull(propertySets)
+ propertySets.zipWithIndex.foreach { case (propertySet, idx) =>
+ propertySet.foreach { case (k, v) =>
+ put(s"$key.$idx.$k", v)
+ }
+ }
+ }
+
+ // ----------------------------------------------------------------------------------------------
+
+ def getString(key: String): Option[String] = {
+ properties.get(key)
+ }
+
+ def getCharacter(key: String): Option[Character] = getString(key) match {
+ case Some(c) =>
+ if (c.length != 1) {
+ throw new ValidationException(s"The value of $key must only contain one character.")
+ }
+ Some(c.charAt(0))
+
+ case None => None
+ }
+
+ def getBoolean(key: String): Option[Boolean] = getString(key) match {
+ case Some(b) => Some(JBoolean.parseBoolean(b))
+
+ case None => None
+ }
+
+ def getInt(key: String): Option[Int] = getString(key) match {
+ case Some(l) => Some(JInt.parseInt(l))
+
+ case None => None
+ }
+
+ def getLong(key: String): Option[Long] = getString(key) match {
+ case Some(l) => Some(JLong.parseLong(l))
+
+ case None => None
+ }
+
+ def getDouble(key: String): Option[Double] = getString(key) match {
+ case Some(d) => Some(JDouble.parseDouble(d))
+
+ case None => None
+ }
+
+ def getTableSchema(key: String): Option[TableSchema] = {
+ // filter for number of columns
+ val fieldCount = properties
+ .filterKeys(k => k.startsWith(key) && k.endsWith(s".$NAME"))
+ .size
+
+ if (fieldCount == 0) {
+ return None
+ }
+
+ // validate fields and build schema
+ val schemaBuilder = TableSchema.builder()
+ for (i <- 0 until fieldCount) {
+ val name = s"$key.$i.$NAME"
+ val tpe = s"$key.$i.$TYPE"
+ schemaBuilder.field(
+ properties.getOrElse(name, throw new ValidationException(s"Invalid table schema. " +
+ s"Could not find name for field '$key.$i'.")
+ ),
+ TypeStringUtils.readTypeInfo(
+ properties.getOrElse(tpe, throw new ValidationException(s"Invalid table schema. " +
+ s"Could not find type for field '$key.$i'."))
+ )
+ )
+ }
+ Some(schemaBuilder.build())
+ }
+
+ // ----------------------------------------------------------------------------------------------
+
+ def validateString(
+ key: String,
+ isOptional: Boolean,
+ minLen: Int = 0, // inclusive
+ maxLen: Int = Integer.MAX_VALUE) // inclusive
+ : Unit = {
+
+ if (!properties.contains(key)) {
+ if (!isOptional) {
+ throw new ValidationException(s"Could not find required property '$key'.")
+ }
+ } else {
+ val len = properties(key).length
+ if (len < minLen || len > maxLen) {
+ throw new ValidationException(
+ s"Property '$key' must have a length between $minLen and $maxLen but " +
+ s"was: ${properties(key)}")
+ }
+ }
+ }
+
+ def validateInt(
+ key: String,
+ isOptional: Boolean,
+ min: Int = Int.MinValue, // inclusive
+ max: Int = Int.MaxValue) // inclusive
+ : Unit = {
+
+ if (!properties.contains(key)) {
+ if (!isOptional) {
+ throw new ValidationException(s"Could not find required property '$key'.")
+ }
+ } else {
+ try {
+ val value = Integer.parseInt(properties(key))
+ if (value < min || value > max) {
+ throw new ValidationException(s"Property '$key' must be an integer value between $min " +
+ s"and $max but was: ${properties(key)}")
+ }
+ } catch {
+ case _: NumberFormatException =>
+ throw new ValidationException(
+ s"Property '$key' must be an integer value but was: ${properties(key)}")
+ }
+ }
+ }
+
+ def validateLong(
+ key: String,
+ isOptional: Boolean,
+ min: Long = Long.MinValue, // inclusive
+ max: Long = Long.MaxValue) // inclusive
+ : Unit = {
+
+ if (!properties.contains(key)) {
+ if (!isOptional) {
+ throw new ValidationException(s"Could not find required property '$key'.")
+ }
+ } else {
+ try {
+ val value = JLong.parseLong(properties(key))
+ if (value < min || value > max) {
+ throw new ValidationException(s"Property '$key' must be a long value between $min " +
+ s"and $max but was: ${properties(key)}")
+ }
+ } catch {
+ case _: NumberFormatException =>
+ throw new ValidationException(
+ s"Property '$key' must be a long value but was: ${properties(key)}")
+ }
+ }
+ }
+
+ def validateValue(key: String, value: String, isOptional: Boolean): Unit = {
+ if (!properties.contains(key)) {
+ if (!isOptional) {
+ throw new ValidationException(s"Could not find required property '$key'.")
+ }
+ } else {
+ if (properties(key) != value) {
+ throw new ValidationException(
+ s"Could not find required value '$value' for property '$key'.")
+ }
+ }
+ }
+
+ def validateBoolean(key: String, isOptional: Boolean): Unit = {
+ if (!properties.contains(key)) {
+ if (!isOptional) {
+ throw new ValidationException(s"Could not find required property '$key'.")
+ }
+ } else {
+ val value = properties(key)
+ if (!value.equalsIgnoreCase("true") && !value.equalsIgnoreCase("false")) {
+ throw new ValidationException(
+ s"Property '$key' must be a boolean value (true/false) but was: $value")
+ }
+ }
+ }
+
+ def validateDouble(
+ key: String,
+ isOptional: Boolean,
+ min: Double = Double.MinValue, // inclusive
+ max: Double = Double.MaxValue) // inclusive
+ : Unit = {
+
+ if (!properties.contains(key)) {
+ if (!isOptional) {
+ throw new ValidationException(s"Could not find required property '$key'.")
+ }
+ } else {
+ try {
+ val value = JDouble.parseDouble(properties(key))
+ if (value < min || value > max) {
+ throw new ValidationException(s"Property '$key' must be a double value between $min " +
+ s"and $max but was: ${properties(key)}")
+ }
+ } catch {
+ case _: NumberFormatException =>
+ throw new ValidationException(
+ s"Property '$key' must be an double value but was: ${properties(key)}")
+ }
+ }
+ }
+
+ def validateTableSchema(key: String, isOptional: Boolean): Unit = {
+ // filter for name columns
+ val names = getIndexedProperty(key, NAME)
+ // filter for type columns
+ val types = getIndexedProperty(key, TYPE)
+ if (names.isEmpty && types.isEmpty && !isOptional) {
+ throw new ValidationException(
+ s"Could not find the required schema for property '$key'.")
+ }
+ for (i <- 0 until Math.max(names.size, types.size)) {
+ validateString(s"$key.$i.$NAME", isOptional = false, minLen = 1)
+ validateType(s"$key.$i.$TYPE", isOptional = false)
+ }
+ }
+
+ def validateEnum(
+ key: String,
+ isOptional: Boolean,
+ enumToValidation: Map[String, () => Unit])
+ : Unit = {
+
+ if (!properties.contains(key)) {
+ if (!isOptional) {
+ throw new ValidationException(s"Could not find required property '$key'.")
+ }
+ } else {
+ val value = properties(key)
+ if (!enumToValidation.contains(value)) {
+ throw new ValidationException(s"Unknown value for property '$key'. " +
+ s"Supported values [${enumToValidation.keys.mkString(", ")}] but was: $value")
+ } else {
+ enumToValidation(value).apply() // run validation logic
+ }
+ }
+ }
+
+ def validateType(key: String, isOptional: Boolean): Unit = {
+ if (!properties.contains(key)) {
+ if (!isOptional) {
+ throw new ValidationException(s"Could not find required property '$key'.")
+ }
+ } else {
+ TypeStringUtils.readTypeInfo(properties(key)) // throws validation exceptions
+ }
+ }
+
+ def validatePrefixExclusion(prefix: String): Unit = {
+ val invalidField = properties.find(_._1.startsWith(prefix))
+ if (invalidField.isDefined) {
+ throw new ValidationException(
+ s"Property '${invalidField.get._1}' is not allowed in this context.")
+ }
+ }
+
+ def validateExclusion(key: String): Unit = {
+ if (properties.contains(key)) {
+ throw new ValidationException(s"Property '$key' is not allowed in this context.")
+ }
+ }
+
+ // ----------------------------------------------------------------------------------------------
+
+ def getIndexedProperty(key: String, property: String): Map[String, String] = {
+ val escapedKey = Pattern.quote(key)
+ properties.filterKeys(k => k.matches(s"$escapedKey\\.\\d+\\.$property")).toMap
+ }
+
+ def contains(str: String): Boolean = {
+ properties.exists(e => e._1.contains(str))
+ }
+
+ def hasPrefix(prefix: String): Boolean = {
+ properties.exists(e => e._1.startsWith(prefix))
+ }
+
+ def asMap: Map[String, String] = {
+ properties.toMap
+ }
+}
+
+object DescriptorProperties {
+
+ val TYPE = "type"
+ val NAME = "name"
+
+ // the string representation should be equal to SqlTypeName
+ def normalizeTypeInfo(typeInfo: TypeInformation[_]): String = {
+ checkNotNull(typeInfo)
+ TypeStringUtils.writeTypeInfo(typeInfo)
+ }
+
+ def normalizeTableSchema(schema: TableSchema): Seq[(String, String)] = {
+ schema.getColumnNames.zip(schema.getTypes).map { case (n, t) =>
+ (n, normalizeTypeInfo(t))
+ }
+ }
+
+ def serialize(obj: Serializable): String = {
+ // test public accessibility
+ val error = InstantiationUtil.checkForInstantiationError(obj.getClass)
+ if (error != null) {
+ throw new ValidationException(s"Class '${obj.getClass.getName}' is not supported: $error")
+ }
+ try {
+ val byteArray = InstantiationUtil.serializeObject(obj)
+ Base64.encodeBase64URLSafeString(byteArray)
+ } catch {
+ case e: Exception =>
+ throw new ValidationException(
+ s"Unable to serialize class '${obj.getClass.getCanonicalName}'.", e)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorValidator.scala
new file mode 100644
index 0000000..007a406
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorValidator.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+ * Validator for a descriptor. We put the validation methods and utilities in separate classes
+ * to keep the descriptor interfaces clean.
+ */
+trait DescriptorValidator {
+
+ /**
+ * Performs basic validation such as completeness tests.
+ */
+ def validate(properties: DescriptorProperties): Unit
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala
new file mode 100644
index 0000000..b1d900f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.FileSystemValidator.{CONNECTOR_PATH, CONNECTOR_TYPE_VALUE}
+
+/**
+ * Connector descriptor for a file system.
+ */
+class FileSystem extends ConnectorDescriptor(CONNECTOR_TYPE_VALUE, version = 1) {
+
+ private var path: Option[String] = None
+
+ /**
+ * Sets the path to a file or directory in a file system.
+ *
+ * @param path the path a file or directory
+ */
+ def path(path: String): FileSystem = {
+ this.path = Some(path)
+ this
+ }
+
+ /**
+ * Internal method for properties conversion.
+ */
+ override protected def addConnectorProperties(properties: DescriptorProperties): Unit = {
+ path.foreach(properties.putString(CONNECTOR_PATH, _))
+ }
+
+ override private[flink] def needsFormat() = true
+}
+
+/**
+ * Connector descriptor for a file system.
+ */
+object FileSystem {
+
+ /**
+ * Connector descriptor for a file system.
+ */
+ def apply(): FileSystem = new FileSystem()
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystemValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystemValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystemValidator.scala
new file mode 100644
index 0000000..b065b5f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FileSystemValidator.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE
+import org.apache.flink.table.descriptors.FileSystemValidator.{CONNECTOR_PATH, CONNECTOR_TYPE_VALUE}
+
+/**
+ * Validator for [[FileSystem]].
+ */
+class FileSystemValidator extends ConnectorDescriptorValidator {
+
+ override def validate(properties: DescriptorProperties): Unit = {
+ super.validate(properties)
+ properties.validateValue(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE, isOptional = false)
+ properties.validateString(CONNECTOR_PATH, isOptional = false, minLen = 1)
+ }
+}
+
+object FileSystemValidator {
+
+ val CONNECTOR_TYPE_VALUE = "filesystem"
+ val CONNECTOR_PATH = "connector.path"
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb58960/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptor.scala
new file mode 100644
index 0000000..86f6229
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptor.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_TYPE, FORMAT_VERSION}
+
+/**
+ * Describes the format of data.
+ *
+ * @param tpe string identifier for the format
+ */
+abstract class FormatDescriptor(
+ private val tpe: String,
+ private val version: Int)
+ extends Descriptor {
+
+ override def toString: String = this.getClass.getSimpleName
+
+ /**
+ * Internal method for properties conversion.
+ */
+ final private[flink] def addProperties(properties: DescriptorProperties): Unit = {
+ properties.putString(FORMAT_TYPE, tpe)
+ properties.putInt(FORMAT_VERSION, version)
+ addFormatProperties(properties)
+ }
+
+ /**
+ * Internal method for format properties conversion.
+ */
+ protected def addFormatProperties(properties: DescriptorProperties): Unit
+
+}