You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/07/20 07:31:36 UTC
[2/3] flink git commit: [FLINK-9852] [table] Expose descriptor-based
sink creation and introduce update mode
http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala
new file mode 100644
index 0000000..ec57c5e
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import org.apache.flink.table.api._
+import org.apache.flink.table.factories._
+import org.apache.flink.table.plan.schema._
+import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.sinks.{BatchTableSink, StreamTableSink}
+import org.apache.flink.table.sources.{BatchTableSource, StreamTableSource}
+import org.apache.flink.table.util.Logging
+
+
+/**
+ * The utility class is used to convert [[ExternalCatalogTable]] to [[TableSourceSinkTable]].
+ *
+ * It uses [[TableFactoryService]] for discovering.
+ */
+object ExternalTableUtil extends Logging {
+
+ /**
+ * Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance
+ *
+ * @param externalTable the [[ExternalCatalogTable]] instance which to convert
+ * @return converted [[TableSourceTable]] instance from the input catalog table
+ */
+ def fromExternalCatalogTable[T1, T2](
+ tableEnv: TableEnvironment,
+ externalTable: ExternalCatalogTable)
+ : TableSourceSinkTable[T1, T2] = {
+
+ val statistics = new FlinkStatistic(externalTable.getTableStats)
+
+ val source: Option[TableSourceTable[T1]] = if (externalTable.isTableSource) {
+ Some(createTableSource(tableEnv, externalTable, statistics))
+ } else {
+ None
+ }
+
+ val sink: Option[TableSinkTable[T2]] = if (externalTable.isTableSink) {
+ Some(createTableSink(tableEnv, externalTable, statistics))
+ } else {
+ None
+ }
+
+ new TableSourceSinkTable[T1, T2](source, sink)
+ }
+
+ private def createTableSource[T](
+ tableEnv: TableEnvironment,
+ externalTable: ExternalCatalogTable,
+ statistics: FlinkStatistic)
+ : TableSourceTable[T] = tableEnv match {
+
+ case _: BatchTableEnvironment if externalTable.isBatchTable =>
+ val source = TableFactoryUtil.findAndCreateTableSource(tableEnv, externalTable)
+ new BatchTableSourceTable[T](source.asInstanceOf[BatchTableSource[T]], statistics)
+
+ case _: StreamTableEnvironment if externalTable.isStreamTable =>
+ val source = TableFactoryUtil.findAndCreateTableSource(tableEnv, externalTable)
+ new StreamTableSourceTable[T](source.asInstanceOf[StreamTableSource[T]], statistics)
+
+ case _ =>
+ throw new ValidationException(
+ "External catalog table does not support the current environment for a table source.")
+ }
+
+ private def createTableSink[T](
+ tableEnv: TableEnvironment,
+ externalTable: ExternalCatalogTable,
+ statistics: FlinkStatistic)
+ : TableSinkTable[T] = tableEnv match {
+
+ case _: BatchTableEnvironment if externalTable.isBatchTable =>
+ val sink = TableFactoryUtil.findAndCreateTableSink(tableEnv, externalTable)
+ new TableSinkTable[T](sink.asInstanceOf[BatchTableSink[T]], statistics)
+
+ case _: StreamTableEnvironment if externalTable.isStreamTable =>
+ val sink = TableFactoryUtil.findAndCreateTableSink(tableEnv, externalTable)
+ new TableSinkTable[T](sink.asInstanceOf[StreamTableSink[T]], statistics)
+
+ case _ =>
+ throw new ValidationException(
+ "External catalog table does not support the current environment for a table sink.")
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableDescriptor.scala
new file mode 100644
index 0000000..6bd2a71
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableDescriptor.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.BatchTableEnvironment
+
+/**
+ * Descriptor for specifying a table source and/or sink in a batch environment.
+ */
+class BatchTableDescriptor(
+ tableEnv: BatchTableEnvironment,
+ connectorDescriptor: ConnectorDescriptor)
+ extends ConnectTableDescriptor[BatchTableDescriptor](
+ tableEnv,
+ connectorDescriptor)
http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala
deleted file mode 100644
index c967291..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import org.apache.flink.table.api.{BatchTableEnvironment, Table, ValidationException}
-import org.apache.flink.table.factories.{BatchTableSourceFactory, TableFactoryService}
-import org.apache.flink.table.sources.TableSource
-
-class BatchTableSourceDescriptor(tableEnv: BatchTableEnvironment, connector: ConnectorDescriptor)
- extends TableSourceDescriptor {
-
- connectorDescriptor = Some(connector)
-
- override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
- // check for a format
- if (connector.needsFormat() && formatDescriptor.isEmpty) {
- throw new ValidationException(
- s"The connector '$connector' requires a format description.")
- } else if (!connector.needsFormat() && formatDescriptor.isDefined) {
- throw new ValidationException(
- s"The connector '$connector' does not require a format description " +
- s"but '${formatDescriptor.get}' found.")
- }
- super.addProperties(properties)
- }
-
- /**
- * Searches for the specified table source, configures it accordingly, and returns it.
- */
- def toTableSource: TableSource[_] = {
- val properties = new DescriptorProperties()
- addProperties(properties)
- val javaMap = properties.asMap
- TableFactoryService
- .find(classOf[BatchTableSourceFactory[_]], javaMap)
- .createBatchTableSource(javaMap)
- }
-
- /**
- * Searches for the specified table source, configures it accordingly, and returns it as a table.
- */
- def toTable: Table = {
- tableEnv.fromTableSource(toTableSource)
- }
-
- /**
- * Searches for the specified table source, configures it accordingly, and registers it as
- * a table under the given name.
- *
- * @param name table name to be registered in the table environment
- */
- def register(name: String): Unit = {
- tableEnv.registerTableSource(name, toTableSource)
- }
-
- /**
- * Specifies the format that defines how to read data from a connector.
- */
- def withFormat(format: FormatDescriptor): BatchTableSourceDescriptor = {
- formatDescriptor = Some(format)
- this
- }
-
- /**
- * Specifies the resulting table schema.
- */
- def withSchema(schema: Schema): BatchTableSourceDescriptor = {
- schemaDescriptor = Some(schema)
- this
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectTableDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectTableDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectTableDescriptor.scala
new file mode 100644
index 0000000..569b825
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectTableDescriptor.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{TableEnvironment, ValidationException}
+import org.apache.flink.table.factories.TableFactoryUtil
+
+/**
+ * Common class for table's created with [[TableEnvironment.connect(ConnectorDescriptor)]].
+ */
+abstract class ConnectTableDescriptor[D <: ConnectTableDescriptor[D]](
+ private val tableEnv: TableEnvironment,
+ private val connectorDescriptor: ConnectorDescriptor)
+ extends TableDescriptor
+ with SchematicDescriptor[D]
+ with RegistrableDescriptor { this: D =>
+
+ private var formatDescriptor: Option[FormatDescriptor] = None
+ private var schemaDescriptor: Option[Schema] = None
+
+ /**
+ * Searches for the specified table source, configures it accordingly, and registers it as
+ * a table under the given name.
+ *
+ * @param name table name to be registered in the table environment
+ */
+ override def registerTableSource(name: String): Unit = {
+ val tableSource = TableFactoryUtil.findAndCreateTableSource(tableEnv, this)
+ tableEnv.registerTableSource(name, tableSource)
+ }
+
+ /**
+ * Searches for the specified table sink, configures it accordingly, and registers it as
+ * a table under the given name.
+ *
+ * @param name table name to be registered in the table environment
+ */
+ override def registerTableSink(name: String): Unit = {
+ val tableSink = TableFactoryUtil.findAndCreateTableSink(tableEnv, this)
+ tableEnv.registerTableSink(name, tableSink)
+ }
+
+ /**
+ * Searches for the specified table source and sink, configures them accordingly, and registers
+ * them as a table under the given name.
+ *
+ * @param name table name to be registered in the table environment
+ */
+ override def registerTableSourceAndSink(name: String): Unit = {
+ registerTableSource(name)
+ registerTableSink(name)
+ }
+
+ /**
+ * Specifies the format that defines how to read data from a connector.
+ */
+ override def withFormat(format: FormatDescriptor): D = {
+ formatDescriptor = Some(format)
+ this
+ }
+
+ /**
+ * Specifies the resulting table schema.
+ */
+ override def withSchema(schema: Schema): D = {
+ schemaDescriptor = Some(schema)
+ this
+ }
+
+ // ----------------------------------------------------------------------------------------------
+
+ /**
+ * Internal method for properties conversion.
+ */
+ override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
+
+ // this performs only basic validation
+ // more validation can only happen within a factory
+ if (connectorDescriptor.needsFormat() && formatDescriptor.isEmpty) {
+ throw new ValidationException(
+ s"The connector '$connectorDescriptor' requires a format description.")
+ } else if (!connectorDescriptor.needsFormat() && formatDescriptor.isDefined) {
+ throw new ValidationException(
+ s"The connector '$connectorDescriptor' does not require a format description " +
+ s"but '${formatDescriptor.get}' found.")
+ }
+
+ connectorDescriptor.addProperties(properties)
+ formatDescriptor.foreach(_.addProperties(properties))
+ schemaDescriptor.foreach(_.addProperties(properties))
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala
index e21527b..aa96bc4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala
@@ -19,18 +19,23 @@
package org.apache.flink.table.descriptors
/**
- * A class that adds a set of string-based, normalized properties for describing DDL information.
+ * A trait that adds a set of string-based, normalized properties for describing DDL information.
*
* Typical characteristics of a descriptor are:
* - descriptors have a default constructor and a default 'apply()' method for Scala
* - descriptors themselves contain very little logic
* - corresponding validators validate the correctness (goal: have a single point of validation)
*/
-abstract class Descriptor {
+trait Descriptor {
/**
* Internal method for properties conversion.
*/
private[flink] def addProperties(properties: DescriptorProperties): Unit
+ override def toString: String = {
+ val descriptorProperties = new DescriptorProperties()
+ addProperties(descriptorProperties)
+ descriptorProperties.toString
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
index 3ad3eac..2c88dfd 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
@@ -1064,6 +1064,10 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
properties.toMap.asJava
}
+ override def toString: String = {
+ DescriptorProperties.toString(properties.toMap)
+ }
+
// ----------------------------------------------------------------------------------------------
/**
@@ -1283,6 +1287,12 @@ object DescriptorProperties {
.mkString("\n")
}
+ def toJavaMap(descriptor: Descriptor): util.Map[String, String] = {
+ val descriptorProperties = new DescriptorProperties()
+ descriptor.addProperties(descriptorProperties)
+ descriptorProperties.asMap
+ }
+
// the following methods help for Scala <-> Java interfaces
// most of these methods are not necessary once we upgraded to Scala 2.12
http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RegistrableDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RegistrableDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RegistrableDescriptor.scala
new file mode 100644
index 0000000..e89ca8c
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RegistrableDescriptor.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+ * A trait for descriptors that allow to register table source and/or sinks.
+ */
+trait RegistrableDescriptor extends TableDescriptor {
+
+ /**
+ * Searches for the specified table source, configures it accordingly, and registers it as
+ * a table under the given name.
+ *
+ * @param name table name to be registered in the table environment
+ */
+ def registerTableSource(name: String): Unit
+
+ /**
+ * Searches for the specified table sink, configures it accordingly, and registers it as
+ * a table under the given name.
+ *
+ * @param name table name to be registered in the table environment
+ */
+ def registerTableSink(name: String): Unit
+
+ /**
+ * Searches for the specified table source and sink, configures them accordingly, and registers
+ * them as a table under the given name.
+ *
+ * @param name table name to be registered in the table environment
+ */
+ def registerTableSourceAndSink(name: String): Unit
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchematicDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchematicDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchematicDescriptor.scala
new file mode 100644
index 0000000..794ff9e
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchematicDescriptor.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+ * A trait for descriptors that allow to define a format and schema.
+ */
+trait SchematicDescriptor[D <: SchematicDescriptor[D]] extends TableDescriptor {
+
+ /**
+ * Specifies the format that defines how to read data from a connector.
+ */
+ def withFormat(format: FormatDescriptor): D
+
+ /**
+ * Specifies the resulting table schema.
+ */
+ def withSchema(schema: Schema): D
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala
new file mode 100644
index 0000000..b9e64f9
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.descriptors.StreamTableDescriptorValidator._
+
+/**
+ * Descriptor for specifying a table source and/or sink in a streaming environment.
+ */
+class StreamTableDescriptor(
+ tableEnv: StreamTableEnvironment,
+ connectorDescriptor: ConnectorDescriptor)
+ extends ConnectTableDescriptor[StreamTableDescriptor](
+ tableEnv,
+ connectorDescriptor)
+ with StreamableDescriptor[StreamTableDescriptor] {
+
+ private var updateMode: Option[String] = None
+
+ /**
+ * Declares how to perform the conversion between a dynamic table and an external connector.
+ *
+ * In append mode, a dynamic table and an external connector only exchange INSERT messages.
+ *
+ * @see See also [[inRetractMode()]] and [[inUpsertMode()]].
+ */
+ override def inAppendMode(): StreamTableDescriptor = {
+ updateMode = Some(UPDATE_MODE_VALUE_APPEND)
+ this
+ }
+
+ /**
+ * Declares how to perform the conversion between a dynamic table and an external connector.
+ *
+ * In retract mode, a dynamic table and an external connector exchange ADD and RETRACT messages.
+ *
+ * An INSERT change is encoded as an ADD message, a DELETE change as a RETRACT message, and an
+ * UPDATE change as a RETRACT message for the updated (previous) row and an ADD message for
+ * the updating (new) row.
+ *
+ * In this mode, a key must not be defined as opposed to upsert mode. However, every update
+ * consists of two messages which is less efficient.
+ *
+ * @see See also [[inAppendMode()]] and [[inUpsertMode()]].
+ */
+ override def inRetractMode(): StreamTableDescriptor = {
+ updateMode = Some(UPDATE_MODE_VALUE_RETRACT)
+ this
+ }
+
+ /**
+ * Declares how to perform the conversion between a dynamic table and an external connector.
+ *
+ * In upsert mode, a dynamic table and an external connector exchange UPSERT and DELETE messages.
+ *
+ * This mode requires a (possibly composite) unique key by which updates can be propagated. The
+ * external connector needs to be aware of the unique key attribute in order to apply messages
+ * correctly. INSERT and UPDATE changes are encoded as UPSERT messages. DELETE changes as
+ * DELETE messages.
+ *
+ * The main difference to a retract stream is that UPDATE changes are encoded with a single
+ * message and are therefore more efficient.
+ *
+ * @see See also [[inAppendMode()]] and [[inRetractMode()]].
+ */
+ override def inUpsertMode(): StreamTableDescriptor = {
+ updateMode = Some(UPDATE_MODE_VALUE_UPSERT)
+ this
+ }
+
+ // ----------------------------------------------------------------------------------------------
+
+ /**
+ * Internal method for properties conversion.
+ */
+ override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
+ super.addProperties(properties)
+ updateMode.foreach(mode => properties.putString(UPDATE_MODE, mode))
+
+ // this performs only basic validation
+ // more validation can only happen within a factory
+ new StreamTableDescriptorValidator().validate(properties)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptorValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptorValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptorValidator.scala
new file mode 100644
index 0000000..5a6a946
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptorValidator.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import java.util
+
+import org.apache.flink.table.descriptors.StreamTableDescriptorValidator._
+
+/**
+ * Validator for [[StreamTableDescriptor]].
+ */
+class StreamTableDescriptorValidator extends DescriptorValidator {
+
+ override def validate(properties: DescriptorProperties): Unit = {
+ properties.validateEnumValues(
+ UPDATE_MODE,
+ isOptional = false,
+ util.Arrays.asList(
+ UPDATE_MODE_VALUE_APPEND,
+ UPDATE_MODE_VALUE_RETRACT,
+ UPDATE_MODE_VALUE_UPSERT)
+ )
+ }
+}
+
+object StreamTableDescriptorValidator {
+
+ val UPDATE_MODE = "update-mode"
+ val UPDATE_MODE_VALUE_APPEND = "append"
+ val UPDATE_MODE_VALUE_RETRACT = "retract"
+ val UPDATE_MODE_VALUE_UPSERT = "upsert"
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala
deleted file mode 100644
index 6ade2d6..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import org.apache.flink.table.api.{StreamTableEnvironment, Table, ValidationException}
-import org.apache.flink.table.factories.{StreamTableSourceFactory, TableFactoryService}
-import org.apache.flink.table.sources.TableSource
-
-/**
- * Descriptor for specifying a table source in a streaming environment.
- */
-class StreamTableSourceDescriptor(tableEnv: StreamTableEnvironment, connector: ConnectorDescriptor)
- extends TableSourceDescriptor {
-
- connectorDescriptor = Some(connector)
-
- override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
- // check for a format
- if (connector.needsFormat() && formatDescriptor.isEmpty) {
- throw new ValidationException(
- s"The connector '$connector' requires a format description.")
- } else if (!connector.needsFormat() && formatDescriptor.isDefined) {
- throw new ValidationException(
- s"The connector '$connector' does not require a format description " +
- s"but '${formatDescriptor.get}' found.")
- }
- super.addProperties(properties)
- }
-
- /**
- * Searches for the specified table source, configures it accordingly, and returns it.
- */
- def toTableSource: TableSource[_] = {
- val properties = new DescriptorProperties()
- addProperties(properties)
- val javaMap = properties.asMap
- TableFactoryService
- .find(classOf[StreamTableSourceFactory[_]], javaMap)
- .createStreamTableSource(javaMap)
- }
-
- /**
- * Searches for the specified table source, configures it accordingly, and returns it as a table.
- */
- def toTable: Table = {
- tableEnv.fromTableSource(toTableSource)
- }
-
- /**
- * Searches for the specified table source, configures it accordingly, and registers it as
- * a table under the given name.
- *
- * @param name table name to be registered in the table environment
- */
- def register(name: String): Unit = {
- tableEnv.registerTableSource(name, toTableSource)
- }
-
- /**
- * Specifies the format that defines how to read data from a connector.
- */
- def withFormat(format: FormatDescriptor): StreamTableSourceDescriptor = {
- formatDescriptor = Some(format)
- this
- }
-
- /**
- * Specifies the resulting table schema.
- */
- def withSchema(schema: Schema): StreamTableSourceDescriptor = {
- schemaDescriptor = Some(schema)
- this
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamableDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamableDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamableDescriptor.scala
new file mode 100644
index 0000000..0d424bd
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamableDescriptor.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+ * A trait for descriptors that allow to convert between a dynamic table and an external connector.
+ */
+trait StreamableDescriptor[D <: StreamableDescriptor[D]] extends TableDescriptor {
+
+ /**
+ * Declares how to perform the conversion between a dynamic table and an external connector.
+ *
+ * In append mode, a dynamic table and an external connector only exchange INSERT messages.
+ *
+ * @see See also [[inRetractMode()]] and [[inUpsertMode()]].
+ */
+ def inAppendMode(): D
+
+ /**
+ * Declares how to perform the conversion between a dynamic table and an external connector.
+ *
+ * In retract mode, a dynamic table and an external connector exchange ADD and RETRACT messages.
+ *
+ * An INSERT change is encoded as an ADD message, a DELETE change as a RETRACT message, and an
+ * UPDATE change as a RETRACT message for the updated (previous) row and an ADD message for
+ * the updating (new) row.
+ *
+ * In this mode, a key must not be defined as opposed to upsert mode. However, every update
+ * consists of two messages which is less efficient.
+ *
+ * @see See also [[inAppendMode()]] and [[inUpsertMode()]].
+ */
+ def inRetractMode(): D
+
+ /**
+ * Declares how to perform the conversion between a dynamic table and an external connector.
+ *
+ * In upsert mode, a dynamic table and an external connector exchange UPSERT and DELETE messages.
+ *
+ * This mode requires a (possibly composite) unique key by which updates can be propagated. The
+ * external connector needs to be aware of the unique key attribute in order to apply messages
+ * correctly. INSERT and UPDATE changes are encoded as UPSERT messages. DELETE changes as
+ * DELETE messages.
+ *
+ * The main difference to a retract stream is that UPDATE changes are encoded with a single
+ * message and are therefore more efficient.
+ *
+ * @see See also [[inAppendMode()]] and [[inRetractMode()]].
+ */
+ def inUpsertMode(): D
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptor.scala
index 7b864d8..b14a310 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptor.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptor.scala
@@ -19,22 +19,6 @@
package org.apache.flink.table.descriptors
/**
- * Common class for all descriptors describing table sources and sinks.
+ * Common trait for all descriptors describing table sources and sinks.
*/
-abstract class TableDescriptor extends Descriptor {
-
- protected var connectorDescriptor: Option[ConnectorDescriptor] = None
- protected var formatDescriptor: Option[FormatDescriptor] = None
- protected var schemaDescriptor: Option[Schema] = None
- protected var metaDescriptor: Option[Metadata] = None
-
- /**
- * Internal method for properties conversion.
- */
- override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
- connectorDescriptor.foreach(_.addProperties(properties))
- formatDescriptor.foreach(_.addProperties(properties))
- schemaDescriptor.foreach(_.addProperties(properties))
- metaDescriptor.foreach(_.addProperties(properties))
- }
-}
+trait TableDescriptor extends Descriptor
http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptorValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptorValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptorValidator.scala
deleted file mode 100644
index e0fa602..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptorValidator.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-/**
- * Validator for [[TableDescriptor]].
- */
-class TableDescriptorValidator extends DescriptorValidator {
-
- override def validate(properties: DescriptorProperties): Unit = {
- // nothing to do
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSinkDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSinkDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSinkDescriptor.scala
deleted file mode 100644
index 0a4d504..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSinkDescriptor.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-/**
- * Common class for all descriptors describing a table sink.
- */
-abstract class TableSinkDescriptor extends TableDescriptor {
-
- /**
- * Internal method for properties conversion.
- */
- override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
- super.addProperties(properties)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala
deleted file mode 100644
index 3ca39c2..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import org.apache.flink.table.descriptors.DescriptorProperties.toScala
-import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats}
-import org.apache.flink.table.plan.stats.TableStats
-
-import scala.collection.JavaConverters._
-
-/**
- * Common class for all descriptors describing a table source.
- */
-abstract class TableSourceDescriptor extends TableDescriptor {
-
- protected var statisticsDescriptor: Option[Statistics] = None
-
- /**
- * Internal method for properties conversion.
- */
- override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
- super.addProperties(properties)
- statisticsDescriptor.foreach(_.addProperties(properties))
- }
-
- /**
- * Reads table statistics from the descriptors properties.
- */
- protected def getTableStats: Option[TableStats] = {
- val normalizedProps = new DescriptorProperties()
- addProperties(normalizedProps)
- val rowCount = toScala(normalizedProps.getOptionalLong(STATISTICS_ROW_COUNT))
- rowCount match {
- case Some(cnt) =>
- val columnStats = readColumnStats(normalizedProps, STATISTICS_COLUMNS)
- Some(TableStats(cnt, columnStats.asJava))
- case None =>
- None
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactory.scala
index cc99ecc..a5dcc60 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactory.scala
@@ -29,6 +29,12 @@ import java.util
* Classes that implement this interface can be added to the
* "META_INF/services/org.apache.flink.table.factories.TableFactory" file of a JAR file in
* the current classpath to be found.
+ *
+ * @see [[BatchTableSourceFactory]]
+ * @see [[BatchTableSinkFactory]]
+ * @see [[StreamTableSourceFactory]]
+ * @see [[StreamTableSinkFactory]]
+ * @see [[TableFormatFactory]]
*/
trait TableFactory {
http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala
index 3baff8e..26b7c6d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala
@@ -50,9 +50,7 @@ object TableFactoryService extends Logging {
def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
Preconditions.checkNotNull(descriptor)
- val descriptorProperties = new DescriptorProperties()
- descriptor.addProperties(descriptorProperties)
- findInternal(factoryClass, descriptorProperties.asMap, None)
+ findInternal(factoryClass, DescriptorProperties.toJavaMap(descriptor), None)
}
/**
@@ -68,9 +66,7 @@ object TableFactoryService extends Logging {
Preconditions.checkNotNull(descriptor)
Preconditions.checkNotNull(classLoader)
- val descriptorProperties = new DescriptorProperties()
- descriptor.addProperties(descriptorProperties)
- findInternal(factoryClass, descriptorProperties.asMap, Some(classLoader))
+ findInternal(factoryClass, DescriptorProperties.toJavaMap(descriptor), Some(classLoader))
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryUtil.scala
new file mode 100644
index 0000000..9989ebc
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryUtil.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories
+
+import org.apache.flink.table.api.{BatchTableEnvironment, StreamTableEnvironment, TableEnvironment, TableException}
+import org.apache.flink.table.descriptors.{Descriptor, DescriptorProperties}
+import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.table.sources.TableSource
+
+/**
+ * Utility for dealing with [[TableFactory]] using the [[TableFactoryService]].
+ */
+object TableFactoryUtil {
+
+ /**
+ * Returns a table source for a table environment.
+ */
+ def findAndCreateTableSource[T](
+ tableEnvironment: TableEnvironment,
+ descriptor: Descriptor)
+ : TableSource[T] = {
+
+ val javaMap = DescriptorProperties.toJavaMap(descriptor)
+
+ tableEnvironment match {
+ case _: BatchTableEnvironment =>
+ TableFactoryService
+ .find(classOf[BatchTableSourceFactory[T]], javaMap)
+ .createBatchTableSource(javaMap)
+
+ case _: StreamTableEnvironment =>
+ TableFactoryService
+ .find(classOf[StreamTableSourceFactory[T]], javaMap)
+ .createStreamTableSource(javaMap)
+
+ case e@_ =>
+ throw new TableException(s"Unsupported table environment: ${e.getClass.getName}")
+ }
+ }
+
+ /**
+ * Returns a table sink for a table environment.
+ */
+ def findAndCreateTableSink[T](
+ tableEnvironment: TableEnvironment,
+ descriptor: Descriptor)
+ : TableSink[T] = {
+
+ val javaMap = DescriptorProperties.toJavaMap(descriptor)
+
+ tableEnvironment match {
+ case _: BatchTableEnvironment =>
+ TableFactoryService
+ .find(classOf[BatchTableSinkFactory[T]], javaMap)
+ .createBatchTableSink(javaMap)
+
+ case _: StreamTableEnvironment =>
+ TableFactoryService
+ .find(classOf[StreamTableSinkFactory[T]], javaMap)
+ .createStreamTableSink(javaMap)
+
+ case e@_ =>
+ throw new TableException(s"Unsupported table environment: ${e.getClass.getName}")
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFormatFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFormatFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFormatFactory.scala
index 8fa6fad..9e42a15 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFormatFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFormatFactory.scala
@@ -25,6 +25,8 @@ import java.util
* also [[TableFactory]] for more information.
*
* @tparam T record type that the format produces or consumes
+ * @see [[DeserializationSchemaFactory]]
+ * @see [[SerializationSchemaFactory]]
*/
trait TableFormatFactory[T] extends TableFactory {
http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvAppendTableSinkFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvAppendTableSinkFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvAppendTableSinkFactory.scala
new file mode 100644
index 0000000..65a41bb
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvAppendTableSinkFactory.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.util
+
+import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND}
+import org.apache.flink.table.factories.StreamTableSinkFactory
+import org.apache.flink.types.Row
+
+/**
+ * Factory base for creating configured instances of [[CsvTableSink]] in a stream environment.
+ */
+class CsvAppendTableSinkFactory
+ extends CsvTableSinkFactoryBase
+ with StreamTableSinkFactory[Row] {
+
+ override def requiredContext(): util.Map[String, String] = {
+ val context = new util.HashMap[String, String](super.requiredContext())
+ context.put(UPDATE_MODE, UPDATE_MODE_VALUE_APPEND)
+ context
+ }
+
+ override def createStreamTableSink(
+ properties: util.Map[String, String])
+ : StreamTableSink[Row] = {
+ createTableSink(isStreaming = true, properties)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvBatchTableSinkFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvBatchTableSinkFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvBatchTableSinkFactory.scala
new file mode 100644
index 0000000..2687ed2
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvBatchTableSinkFactory.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.util
+
+import org.apache.flink.table.factories.BatchTableSinkFactory
+import org.apache.flink.types.Row
+
+/**
+ * Factory base for creating configured instances of [[CsvTableSink]] in a batch environment.
+ */
+class CsvBatchTableSinkFactory
+ extends CsvTableSinkFactoryBase
+ with BatchTableSinkFactory[Row] {
+
+ override def createBatchTableSink(
+ properties: util.Map[String, String])
+ : BatchTableSink[Row] = {
+ createTableSink(isStreaming = false, properties)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactory.scala
deleted file mode 100644
index eb99f02..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactory.scala
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sinks
-
-import java.util
-
-import org.apache.flink.table.api.TableException
-import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
-import org.apache.flink.table.descriptors.CsvValidator._
-import org.apache.flink.table.descriptors.DescriptorProperties._
-import org.apache.flink.table.descriptors.FileSystemValidator._
-import org.apache.flink.table.descriptors.FormatDescriptorValidator._
-import org.apache.flink.table.descriptors.SchemaValidator._
-import org.apache.flink.table.descriptors._
-import org.apache.flink.table.factories.{BatchTableSinkFactory, StreamTableSinkFactory, TableFactory}
-import org.apache.flink.types.Row
-
-/**
- * Factory for creating configured instances of [[CsvTableSink]].
- */
-class CsvTableSinkFactory
- extends TableFactory
- with StreamTableSinkFactory[Row]
- with BatchTableSinkFactory[Row] {
-
- override def requiredContext(): util.Map[String, String] = {
- val context = new util.HashMap[String, String]()
- context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE)
- context.put(FORMAT_TYPE, FORMAT_TYPE_VALUE)
- context.put(CONNECTOR_PROPERTY_VERSION, "1")
- context.put(FORMAT_PROPERTY_VERSION, "1")
- context
- }
-
- override def supportedProperties(): util.List[String] = {
- val properties = new util.ArrayList[String]()
- // connector
- properties.add(CONNECTOR_PATH)
- // format
- properties.add(s"$FORMAT_FIELDS.#.${DescriptorProperties.TYPE}")
- properties.add(s"$FORMAT_FIELDS.#.${DescriptorProperties.NAME}")
- properties.add(FORMAT_FIELD_DELIMITER)
- properties.add(CONNECTOR_PATH)
- // schema
- properties.add(s"$SCHEMA.#.${DescriptorProperties.TYPE}")
- properties.add(s"$SCHEMA.#.${DescriptorProperties.NAME}")
- properties
- }
-
- override def createStreamTableSink(
- properties: util.Map[String, String])
- : StreamTableSink[Row] = {
- createTableSink(isStreaming = true, properties)
- }
-
- override def createBatchTableSink(
- properties: util.Map[String, String])
- : BatchTableSink[Row] = {
- createTableSink(isStreaming = false, properties)
- }
-
- private def createTableSink(
- isStreaming: Boolean,
- properties: util.Map[String, String])
- : CsvTableSink = {
-
- val params = new DescriptorProperties()
- params.putProperties(properties)
-
- // validate
- new FileSystemValidator().validate(params)
- new CsvValidator().validate(params)
- new SchemaValidator(
- isStreaming,
- supportsSourceTimestamps = false,
- supportsSourceWatermarks = false).validate(params)
-
- // build
- val formatSchema = params.getTableSchema(FORMAT_FIELDS)
- val tableSchema = SchemaValidator.deriveTableSinkSchema(params)
-
- if (!formatSchema.equals(tableSchema)) {
- throw new TableException(
- "Encodings that differ from the schema are not supported yet for CsvTableSink.")
- }
-
- val path = params.getString(CONNECTOR_PATH)
- val fieldDelimiter = toScala(params.getOptionalString(FORMAT_FIELD_DELIMITER)).getOrElse(",")
-
- val csvTableSink = new CsvTableSink(path, fieldDelimiter)
-
- csvTableSink
- .configure(formatSchema.getColumnNames, formatSchema.getTypes)
- .asInstanceOf[CsvTableSink]
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala
new file mode 100644
index 0000000..6ceba4c
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.util
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
+import org.apache.flink.table.descriptors.CsvValidator._
+import org.apache.flink.table.descriptors.DescriptorProperties._
+import org.apache.flink.table.descriptors.FileSystemValidator._
+import org.apache.flink.table.descriptors.FormatDescriptorValidator._
+import org.apache.flink.table.descriptors.SchemaValidator._
+import org.apache.flink.table.descriptors._
+import org.apache.flink.table.factories.TableFactory
+
+/**
+ * Factory base for creating configured instances of [[CsvTableSink]].
+ */
+abstract class CsvTableSinkFactoryBase extends TableFactory {
+
+ override def requiredContext(): util.Map[String, String] = {
+ val context = new util.HashMap[String, String]()
+ context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE)
+ context.put(FORMAT_TYPE, FORMAT_TYPE_VALUE)
+ context.put(CONNECTOR_PROPERTY_VERSION, "1")
+ context.put(FORMAT_PROPERTY_VERSION, "1")
+ context
+ }
+
+ override def supportedProperties(): util.List[String] = {
+ val properties = new util.ArrayList[String]()
+ // connector
+ properties.add(CONNECTOR_PATH)
+ // format
+ properties.add(s"$FORMAT_FIELDS.#.${DescriptorProperties.TYPE}")
+ properties.add(s"$FORMAT_FIELDS.#.${DescriptorProperties.NAME}")
+ properties.add(FORMAT_FIELD_DELIMITER)
+ properties.add(CONNECTOR_PATH)
+ // schema
+ properties.add(s"$SCHEMA.#.${DescriptorProperties.TYPE}")
+ properties.add(s"$SCHEMA.#.${DescriptorProperties.NAME}")
+ properties
+ }
+
+ protected def createTableSink(
+ isStreaming: Boolean,
+ properties: util.Map[String, String])
+ : CsvTableSink = {
+
+ val params = new DescriptorProperties()
+ params.putProperties(properties)
+
+ // validate
+ new FileSystemValidator().validate(params)
+ new CsvValidator().validate(params)
+ new SchemaValidator(
+ isStreaming,
+ supportsSourceTimestamps = false,
+ supportsSourceWatermarks = false).validate(params)
+
+ // build
+ val formatSchema = params.getTableSchema(FORMAT_FIELDS)
+ val tableSchema = SchemaValidator.deriveTableSinkSchema(params)
+
+ if (!formatSchema.equals(tableSchema)) {
+ throw new TableException(
+ "Encodings that differ from the schema are not supported yet for CsvTableSink.")
+ }
+
+ val path = params.getString(CONNECTOR_PATH)
+ val fieldDelimiter = toScala(params.getOptionalString(FORMAT_FIELD_DELIMITER)).getOrElse(",")
+
+ val csvTableSink = new CsvTableSink(path, fieldDelimiter)
+
+ csvTableSink
+ .configure(formatSchema.getColumnNames, formatSchema.getTypes)
+ .asInstanceOf[CsvTableSink]
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvAppendTableSourceFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvAppendTableSourceFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvAppendTableSourceFactory.scala
new file mode 100644
index 0000000..afbe2ea
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvAppendTableSourceFactory.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.util
+
+import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND}
+import org.apache.flink.table.factories.StreamTableSourceFactory
+import org.apache.flink.types.Row
+
+/**
+ * Factory for creating configured instances of [[CsvTableSource]] in a stream environment.
+ */
+class CsvAppendTableSourceFactory
+ extends CsvTableSourceFactoryBase
+ with StreamTableSourceFactory[Row] {
+
+ override def requiredContext(): util.Map[String, String] = {
+ val context = new util.HashMap[String, String](super.requiredContext())
+ context.put(UPDATE_MODE, UPDATE_MODE_VALUE_APPEND)
+ context
+ }
+
+ override def createStreamTableSource(
+ properties: util.Map[String, String])
+ : StreamTableSource[Row] = {
+ createTableSource(isStreaming = true, properties)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvBatchTableSourceFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvBatchTableSourceFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvBatchTableSourceFactory.scala
new file mode 100644
index 0000000..9d8fa40
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvBatchTableSourceFactory.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.util
+
+import org.apache.flink.table.factories.BatchTableSourceFactory
+import org.apache.flink.types.Row
+
+/**
+ * Factory for creating configured instances of [[CsvTableSource]] in a batch environment.
+ */
+class CsvBatchTableSourceFactory
+ extends CsvTableSourceFactoryBase
+ with BatchTableSourceFactory[Row] {
+
+ override def createBatchTableSource(
+ properties: util.Map[String, String])
+ : BatchTableSource[Row] = {
+ createTableSource(isStreaming = false, properties)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala
deleted file mode 100644
index 96751ec..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources
-
-import java.util
-
-import org.apache.flink.table.api.TableException
-import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION, CONNECTOR_TYPE}
-import org.apache.flink.table.descriptors.CsvValidator._
-import org.apache.flink.table.descriptors.DescriptorProperties.toScala
-import org.apache.flink.table.descriptors.FileSystemValidator.{CONNECTOR_PATH, CONNECTOR_TYPE_VALUE}
-import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_PROPERTY_VERSION, FORMAT_TYPE}
-import org.apache.flink.table.descriptors.SchemaValidator.SCHEMA
-import org.apache.flink.table.descriptors._
-import org.apache.flink.table.factories.{BatchTableSourceFactory, StreamTableSourceFactory, TableFactory}
-import org.apache.flink.types.Row
-
-/**
- * Factory for creating configured instances of [[CsvTableSource]].
- */
-class CsvTableSourceFactory
- extends TableFactory
- with StreamTableSourceFactory[Row]
- with BatchTableSourceFactory[Row] {
-
- override def requiredContext(): util.Map[String, String] = {
- val context = new util.HashMap[String, String]()
- context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE)
- context.put(FORMAT_TYPE, FORMAT_TYPE_VALUE)
- context.put(CONNECTOR_PROPERTY_VERSION, "1")
- context.put(FORMAT_PROPERTY_VERSION, "1")
- context
- }
-
- override def supportedProperties(): util.List[String] = {
- val properties = new util.ArrayList[String]()
- // connector
- properties.add(CONNECTOR_PATH)
- // format
- properties.add(s"$FORMAT_FIELDS.#.${DescriptorProperties.TYPE}")
- properties.add(s"$FORMAT_FIELDS.#.${DescriptorProperties.NAME}")
- properties.add(FORMAT_FIELD_DELIMITER)
- properties.add(FORMAT_LINE_DELIMITER)
- properties.add(FORMAT_QUOTE_CHARACTER)
- properties.add(FORMAT_COMMENT_PREFIX)
- properties.add(FORMAT_IGNORE_FIRST_LINE)
- properties.add(FORMAT_IGNORE_PARSE_ERRORS)
- properties.add(CONNECTOR_PATH)
- // schema
- properties.add(s"$SCHEMA.#.${DescriptorProperties.TYPE}")
- properties.add(s"$SCHEMA.#.${DescriptorProperties.NAME}")
- properties
- }
-
- override def createStreamTableSource(
- properties: util.Map[String, String])
- : StreamTableSource[Row] = {
- createTableSource(isStreaming = true, properties)
- }
-
- override def createBatchTableSource(
- properties: util.Map[String, String])
- : BatchTableSource[Row] = {
- createTableSource(isStreaming = false, properties)
- }
-
- private def createTableSource(
- isStreaming: Boolean,
- properties: util.Map[String, String])
- : CsvTableSource = {
-
- val params = new DescriptorProperties()
- params.putProperties(properties)
-
- // validate
- new FileSystemValidator().validate(params)
- new CsvValidator().validate(params)
- new SchemaValidator(
- isStreaming,
- supportsSourceTimestamps = false,
- supportsSourceWatermarks = false).validate(params)
-
- // build
- val csvTableSourceBuilder = new CsvTableSource.Builder
-
- val formatSchema = params.getTableSchema(FORMAT_FIELDS)
- val tableSchema = params.getTableSchema(SCHEMA)
-
- // the CsvTableSource needs some rework first
- // for now the schema must be equal to the encoding
- if (!formatSchema.equals(tableSchema)) {
- throw new TableException(
- "Encodings that differ from the schema are not supported yet for CsvTableSources.")
- }
-
- toScala(params.getOptionalString(CONNECTOR_PATH))
- .foreach(csvTableSourceBuilder.path)
- toScala(params.getOptionalString(FORMAT_FIELD_DELIMITER))
- .foreach(csvTableSourceBuilder.fieldDelimiter)
- toScala(params.getOptionalString(FORMAT_LINE_DELIMITER))
- .foreach(csvTableSourceBuilder.lineDelimiter)
-
- formatSchema.getColumnNames.zip(formatSchema.getTypes).foreach { case (name, tpe) =>
- csvTableSourceBuilder.field(name, tpe)
- }
- toScala(params.getOptionalCharacter(FORMAT_QUOTE_CHARACTER))
- .foreach(csvTableSourceBuilder.quoteCharacter)
- toScala(params.getOptionalString(FORMAT_COMMENT_PREFIX))
- .foreach(csvTableSourceBuilder.commentPrefix)
- toScala(params.getOptionalBoolean(FORMAT_IGNORE_FIRST_LINE)).foreach { flag =>
- if (flag) {
- csvTableSourceBuilder.ignoreFirstLine()
- }
- }
- toScala(params.getOptionalBoolean(FORMAT_IGNORE_PARSE_ERRORS)).foreach { flag =>
- if (flag) {
- csvTableSourceBuilder.ignoreParseErrors()
- }
- }
-
- csvTableSourceBuilder.build()
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactoryBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactoryBase.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactoryBase.scala
new file mode 100644
index 0000000..d320220
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactoryBase.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.util
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION, CONNECTOR_TYPE}
+import org.apache.flink.table.descriptors.CsvValidator._
+import org.apache.flink.table.descriptors.DescriptorProperties.toScala
+import org.apache.flink.table.descriptors.FileSystemValidator.{CONNECTOR_PATH, CONNECTOR_TYPE_VALUE}
+import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_PROPERTY_VERSION, FORMAT_TYPE}
+import org.apache.flink.table.descriptors.SchemaValidator.SCHEMA
+import org.apache.flink.table.descriptors._
+import org.apache.flink.table.factories.TableFactory
+
+/**
+ * Factory base for creating configured instances of [[CsvTableSource]].
+ */
+abstract class CsvTableSourceFactoryBase extends TableFactory {
+
+ override def requiredContext(): util.Map[String, String] = {
+ val context = new util.HashMap[String, String]()
+ context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE)
+ context.put(FORMAT_TYPE, FORMAT_TYPE_VALUE)
+ context.put(CONNECTOR_PROPERTY_VERSION, "1")
+ context.put(FORMAT_PROPERTY_VERSION, "1")
+ context
+ }
+
+ override def supportedProperties(): util.List[String] = {
+ val properties = new util.ArrayList[String]()
+ // connector
+ properties.add(CONNECTOR_PATH)
+ // format
+ properties.add(s"$FORMAT_FIELDS.#.${DescriptorProperties.TYPE}")
+ properties.add(s"$FORMAT_FIELDS.#.${DescriptorProperties.NAME}")
+ properties.add(FORMAT_FIELD_DELIMITER)
+ properties.add(FORMAT_LINE_DELIMITER)
+ properties.add(FORMAT_QUOTE_CHARACTER)
+ properties.add(FORMAT_COMMENT_PREFIX)
+ properties.add(FORMAT_IGNORE_FIRST_LINE)
+ properties.add(FORMAT_IGNORE_PARSE_ERRORS)
+ properties.add(CONNECTOR_PATH)
+ // schema
+ properties.add(s"$SCHEMA.#.${DescriptorProperties.TYPE}")
+ properties.add(s"$SCHEMA.#.${DescriptorProperties.NAME}")
+ properties
+ }
+
+ protected def createTableSource(
+ isStreaming: Boolean,
+ properties: util.Map[String, String])
+ : CsvTableSource = {
+
+ val params = new DescriptorProperties()
+ params.putProperties(properties)
+
+ // validate
+ new FileSystemValidator().validate(params)
+ new CsvValidator().validate(params)
+ new SchemaValidator(
+ isStreaming,
+ supportsSourceTimestamps = false,
+ supportsSourceWatermarks = false).validate(params)
+
+ // build
+ val csvTableSourceBuilder = new CsvTableSource.Builder
+
+ val formatSchema = params.getTableSchema(FORMAT_FIELDS)
+ val tableSchema = params.getTableSchema(SCHEMA)
+
+ // the CsvTableSource needs some rework first
+ // for now the schema must be equal to the encoding
+ if (!formatSchema.equals(tableSchema)) {
+ throw new TableException(
+ "Encodings that differ from the schema are not supported yet for CsvTableSources.")
+ }
+
+ toScala(params.getOptionalString(CONNECTOR_PATH))
+ .foreach(csvTableSourceBuilder.path)
+ toScala(params.getOptionalString(FORMAT_FIELD_DELIMITER))
+ .foreach(csvTableSourceBuilder.fieldDelimiter)
+ toScala(params.getOptionalString(FORMAT_LINE_DELIMITER))
+ .foreach(csvTableSourceBuilder.lineDelimiter)
+
+ formatSchema.getColumnNames.zip(formatSchema.getTypes).foreach { case (name, tpe) =>
+ csvTableSourceBuilder.field(name, tpe)
+ }
+ toScala(params.getOptionalCharacter(FORMAT_QUOTE_CHARACTER))
+ .foreach(csvTableSourceBuilder.quoteCharacter)
+ toScala(params.getOptionalString(FORMAT_COMMENT_PREFIX))
+ .foreach(csvTableSourceBuilder.commentPrefix)
+ toScala(params.getOptionalBoolean(FORMAT_IGNORE_FIRST_LINE)).foreach { flag =>
+ if (flag) {
+ csvTableSourceBuilder.ignoreFirstLine()
+ }
+ }
+ toScala(params.getOptionalBoolean(FORMAT_IGNORE_PARSE_ERRORS)).foreach { flag =>
+ if (flag) {
+ csvTableSourceBuilder.ignoreParseErrors()
+ }
+ }
+
+ csvTableSourceBuilder.build()
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala
index 6df00e7..7f567f9 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala
@@ -39,7 +39,9 @@ class ExternalCatalogTest extends TableTestBase {
val util = batchTestUtil()
val tableEnv = util.tableEnv
- tableEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
+ tableEnv.registerExternalCatalog(
+ "test",
+ CommonTestData.getInMemoryTestCatalog(isStreaming = false))
val table1 = tableEnv.scan("test", "db1", "tb1")
val table2 = tableEnv.scan("test", "db2", "tb2")
@@ -69,7 +71,9 @@ class ExternalCatalogTest extends TableTestBase {
def testBatchSQL(): Unit = {
val util = batchTestUtil()
- util.tableEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
+ util.tableEnv.registerExternalCatalog(
+ "test",
+ CommonTestData.getInMemoryTestCatalog(isStreaming = false))
val sqlQuery = "SELECT d * 2, e, g FROM test.db2.tb2 WHERE d < 3 UNION ALL " +
"(SELECT a * 2, b, c FROM test.db1.tb1)"
@@ -96,7 +100,9 @@ class ExternalCatalogTest extends TableTestBase {
val util = streamTestUtil()
val tableEnv = util.tableEnv
- util.tableEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
+ util.tableEnv.registerExternalCatalog(
+ "test",
+ CommonTestData.getInMemoryTestCatalog(isStreaming = true))
val table1 = tableEnv.scan("test", "db1", "tb1")
val table2 = tableEnv.scan("test", "db2", "tb2")
@@ -128,7 +134,9 @@ class ExternalCatalogTest extends TableTestBase {
def testStreamSQL(): Unit = {
val util = streamTestUtil()
- util.tableEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
+ util.tableEnv.registerExternalCatalog(
+ "test",
+ CommonTestData.getInMemoryTestCatalog(isStreaming = true))
val sqlQuery = "SELECT d * 2, e, g FROM test.db2.tb2 WHERE d < 3 UNION ALL " +
"(SELECT a * 2, b, c FROM test.db1.tb1)"
@@ -155,7 +163,9 @@ class ExternalCatalogTest extends TableTestBase {
val util = batchTestUtil()
val tableEnv = util.tableEnv
- tableEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
+ tableEnv.registerExternalCatalog(
+ "test",
+ CommonTestData.getInMemoryTestCatalog(isStreaming = false))
val table1 = tableEnv.scan("test", "tb1")
val table2 = tableEnv.scan("test", "db2", "tb2")
http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
index 58f51f9..c98a7c1 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
@@ -47,7 +47,7 @@ class ExternalCatalogSchemaTest extends TableTestBase {
@Before
def setUp(): Unit = {
val rootSchemaPlus: SchemaPlus = CalciteSchema.createRootSchema(true, false).plus()
- val catalog = CommonTestData.getInMemoryTestCatalog
+ val catalog = CommonTestData.getInMemoryTestCatalog(isStreaming = true)
ExternalCatalogSchema.registerCatalog(
streamTestUtil().tableEnv, rootSchemaPlus, schemaName, catalog)
externalCatalogSchema = rootSchemaPlus.getSubSchema("schemaName")
http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala
index ac2a729..1f84b3d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala
@@ -138,7 +138,9 @@ class InMemoryExternalCatalogTest {
val schemaDesc = new Schema()
.field("first", BasicTypeInfo.STRING_TYPE_INFO)
.field("second", BasicTypeInfo.INT_TYPE_INFO)
- new ExternalCatalogTable(connDesc, None, Some(schemaDesc), None, None)
+ ExternalCatalogTable.builder(connDesc)
+ .withSchema(schemaDesc)
+ .asTableSource()
}
private def createTableInstance(
@@ -149,7 +151,9 @@ class InMemoryExternalCatalogTest {
fieldNames.zipWithIndex.foreach { case (fieldName, index) =>
schemaDesc.field(fieldName, fieldTypes(index))
}
- new ExternalCatalogTable(connDesc, None, Some(schemaDesc), None, None)
+ ExternalCatalogTable.builder(connDesc)
+ .withSchema(schemaDesc)
+ .asTableSource()
}
class TestConnectorDesc extends ConnectorDescriptor("test", version = 1, formatNeeded = false) {