You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by twalthr <gi...@git.apache.org> on 2018/07/16 16:03:00 UTC
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
GitHub user twalthr opened a pull request:
https://github.com/apache/flink/pull/6343
[FLINK-9852] [table] Expose descriptor-based sink creation
## What is the purpose of the change
This commit exposes the new unified sink creation through the table environments and the external catalog table. It introduce a new `update-mode` property in order to distinguish between append, retract, and upsert table sources and sinks. This commit refactors the top-level API classes a last time and adds more documentation. This commit completes the unified table sources/sinks story from an API point of view.
## Brief change log
- Introduction of `TableEnvironment.connect()` and corresponding API builder classes
- Introduction of property `update-mode: table` and update of existing connectors
- External catalog support with proper source/sink discovery and API
## Verifying this change
Existing tests were adapted.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
- The serializers: no
- The runtime per-record code paths (performance sensitive): no
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
- The S3 file system connector: no
## Documentation
- Does this pull request introduce a new feature? no
- If yes, how is the feature documented? not documented
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/twalthr/flink FLINK-9852
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/6343.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #6343
----
----
---
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6343#discussion_r202961529
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala ---
@@ -18,33 +18,299 @@
package org.apache.flink.table.catalog
+import org.apache.flink.table.descriptors.DescriptorProperties.toScala
+import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats}
+import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT}
import org.apache.flink.table.descriptors._
import org.apache.flink.table.plan.stats.TableStats
+import scala.collection.JavaConverters._
+
/**
- * Defines a table in an [[ExternalCatalog]].
+ * Defines a table in an [[ExternalCatalog]]. External catalog tables describe table sources
+ * and/or sinks for both batch and stream environments.
+ *
+ * The catalog table takes descriptors which allow for declaring the communication to external
+ * systems in an implementation-agnostic way. The classpath is scanned for suitable table factories
+ * that match the desired configuration.
+ *
+ * Use the provided builder methods to configure the external catalog table accordingly.
+ *
+ * The following example shows how to read from a connector using a JSON format and
+ * declaring it as a table source:
*
- * @param connectorDesc describes the system to connect to
- * @param formatDesc describes the data format of a connector
- * @param schemaDesc describes the schema of the result table
- * @param statisticsDesc describes the estimated statistics of the result table
- * @param metadataDesc describes additional metadata of a table
+ * {{{
+ * ExternalCatalogTable(
+ * new ExternalSystemXYZ()
+ * .version("0.11"))
+ * .withFormat(
+ * new Json()
+ * .jsonSchema("{...}")
+ * .failOnMissingField(false))
+ * .withSchema(
+ * new Schema()
+ * .field("user-name", "VARCHAR").from("u_name")
+ * .field("count", "DECIMAL")
+ * .asTableSource()
--- End diff --
make `ExternalCatalogTable` and `BatchTableDescriptor` true builders with final fields?
---
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6343#discussion_r202964290
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import java.util
+
+import org.apache.flink.table.api._
+import org.apache.flink.table.descriptors.DescriptorProperties
+import org.apache.flink.table.factories._
+import org.apache.flink.table.plan.schema._
+import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.util.Logging
+
+
+/**
+ * The utility class is used to convert [[ExternalCatalogTable]] to [[TableSourceSinkTable]].
+ *
+ * It uses [[TableFactoryService]] for discovering.
+ */
+object ExternalTableUtil extends Logging {
+
+ /**
+ * Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance
+ *
+ * @param externalCatalogTable the [[ExternalCatalogTable]] instance which to convert
+ * @return converted [[TableSourceTable]] instance from the input catalog table
+ */
+ def fromExternalCatalogTable[T1, T2](
+ tableEnv: TableEnvironment,
+ externalCatalogTable: ExternalCatalogTable)
+ : TableSourceSinkTable[T1, T2] = {
+
+ val properties = new DescriptorProperties()
+ externalCatalogTable.addProperties(properties)
+ val javaMap = properties.asMap
+ val statistics = new FlinkStatistic(externalCatalogTable.getTableStats)
+
+ val source: Option[TableSourceTable[T1]] = tableEnv match {
+ // check for a batch table source in this batch environment
+ case _: BatchTableEnvironment if externalCatalogTable.isBatchTable =>
+ createBatchTableSource(externalCatalogTable, javaMap, statistics)
+
+ // check for a stream table source in this stream environment
+ case _: StreamTableEnvironment if externalCatalogTable.isStreamTable =>
+ createStreamTableSource(externalCatalogTable, javaMap, statistics)
+
+ case _ =>
+ throw new ValidationException(
+ "External catalog table does not support the current environment for a table source.")
+ }
+
+ val sink: Option[TableSinkTable[T2]] = tableEnv match {
+ // check for a batch table sink in this batch environment
+ case _: BatchTableEnvironment if externalCatalogTable.isBatchTable =>
+ createBatchTableSink(externalCatalogTable, javaMap, statistics)
+
+ // check for a stream table sink in this stream environment
+ case _: StreamTableEnvironment if externalCatalogTable.isStreamTable =>
+ createStreamTableSink(externalCatalogTable, javaMap, statistics)
+
+ case _ =>
+ throw new ValidationException(
+ "External catalog table does not support the current environment for a table sink.")
+ }
+
+ new TableSourceSinkTable[T1, T2](source, sink)
+ }
+
+ private def createBatchTableSource[T](
--- End diff --
I still do not like this lack of abstraction between batch and streaming in form of `createBatchTableSource`/`createStreamTableSource`.
Instead of writing if/elses everywhere in our code there should be some common layer that handles such logic. Here half of the problem boils down to factories with methods createBatchTableSource and createStreamTableSource.
---
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6343#discussion_r202982499
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchematicDescriptor.scala ---
@@ -19,14 +19,17 @@
package org.apache.flink.table.descriptors
/**
- * Common class for all descriptors describing a table sink.
+ * A trait for descriptors that allow to define a format and schema.
*/
-abstract class TableSinkDescriptor extends TableDescriptor {
+trait SchematicDescriptor extends Descriptor {
--- End diff --
`SchematicDescriptor` is used for `ExternalCatalogTable`, `StreamTableDescriptor`, and `BatchTableDescriptor`. If we add a new level next to `connector`, `format` (which may happens in the future), we would immediately get a compile error there.
---
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6343#discussion_r202978881
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala ---
@@ -18,33 +18,299 @@
package org.apache.flink.table.catalog
+import org.apache.flink.table.descriptors.DescriptorProperties.toScala
+import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats}
+import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT}
import org.apache.flink.table.descriptors._
import org.apache.flink.table.plan.stats.TableStats
+import scala.collection.JavaConverters._
+
/**
- * Defines a table in an [[ExternalCatalog]].
+ * Defines a table in an [[ExternalCatalog]]. External catalog tables describe table sources
+ * and/or sinks for both batch and stream environments.
+ *
+ * The catalog table takes descriptors which allow for declaring the communication to external
+ * systems in an implementation-agnostic way. The classpath is scanned for suitable table factories
+ * that match the desired configuration.
+ *
+ * Use the provided builder methods to configure the external catalog table accordingly.
+ *
+ * The following example shows how to read from a connector using a JSON format and
+ * declaring it as a table source:
*
- * @param connectorDesc describes the system to connect to
- * @param formatDesc describes the data format of a connector
- * @param schemaDesc describes the schema of the result table
- * @param statisticsDesc describes the estimated statistics of the result table
- * @param metadataDesc describes additional metadata of a table
+ * {{{
+ * ExternalCatalogTable(
+ * new ExternalSystemXYZ()
+ * .version("0.11"))
+ * .withFormat(
+ * new Json()
+ * .jsonSchema("{...}")
+ * .failOnMissingField(false))
+ * .withSchema(
+ * new Schema()
+ * .field("user-name", "VARCHAR").from("u_name")
+ * .field("count", "DECIMAL")
+ * .asTableSource()
--- End diff --
I also thought about that but actually descriptors don't "build" something. The only final representation would be the properties but we don't expose them to the user.
---
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6343#discussion_r202986793
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableDescriptor.scala ---
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{BatchTableEnvironment, ValidationException}
+import org.apache.flink.table.factories.{BatchTableSinkFactory, BatchTableSourceFactory, TableFactoryService}
+
+/**
+ * Descriptor for specifying a table source and/or sink in a batch environment.
+ */
+class BatchTableDescriptor(
+ private val tableEnv: BatchTableEnvironment,
+ private val connectorDescriptor: ConnectorDescriptor)
+ extends TableDescriptor
+ with SchematicDescriptor
+ with RegistrableDescriptor {
+
+ private var formatDescriptor: Option[FormatDescriptor] = None
+ private var schemaDescriptor: Option[Schema] = None
+
+ /**
+ * Internal method for properties conversion.
+ */
+ override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
--- End diff --
Depends on the language you are using. This method is public in Java ;)
But I will move it down.
---
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6343#discussion_r202966786
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvAppendTableSinkFactory.scala ---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.util
+
+import org.apache.flink.table.descriptors.SchemaValidator.{SCHEMA, SCHEMA_PROCTIME}
+import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND}
+import org.apache.flink.table.factories.StreamTableSinkFactory
+import org.apache.flink.types.Row
+
+/**
+ * Factory base for creating configured instances of [[CsvTableSink]] in a stream environment.
+ */
+class CsvAppendTableSinkFactory
--- End diff --
This should be done in separate commit
---
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6343#discussion_r202963074
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import java.util
+
+import org.apache.flink.table.api._
+import org.apache.flink.table.descriptors.DescriptorProperties
+import org.apache.flink.table.factories._
+import org.apache.flink.table.plan.schema._
+import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.util.Logging
+
+
+/**
+ * The utility class is used to convert [[ExternalCatalogTable]] to [[TableSourceSinkTable]].
+ *
+ * It uses [[TableFactoryService]] for discovering.
+ */
+object ExternalTableUtil extends Logging {
+
+ /**
+ * Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance
+ *
+ * @param externalCatalogTable the [[ExternalCatalogTable]] instance which to convert
+ * @return converted [[TableSourceTable]] instance from the input catalog table
+ */
+ def fromExternalCatalogTable[T1, T2](
+ tableEnv: TableEnvironment,
+ externalCatalogTable: ExternalCatalogTable)
+ : TableSourceSinkTable[T1, T2] = {
+
+ val properties = new DescriptorProperties()
+ externalCatalogTable.addProperties(properties)
+ val javaMap = properties.asMap
+ val statistics = new FlinkStatistic(externalCatalogTable.getTableStats)
+
+ val source: Option[TableSourceTable[T1]] = tableEnv match {
+ // check for a batch table source in this batch environment
--- End diff --
drop those comments, code is already self explanatory
---
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6343#discussion_r203303655
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import java.util
+
+import org.apache.flink.table.api._
+import org.apache.flink.table.descriptors.DescriptorProperties
+import org.apache.flink.table.factories._
+import org.apache.flink.table.plan.schema._
+import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.util.Logging
+
+
+/**
+ * The utility class is used to convert [[ExternalCatalogTable]] to [[TableSourceSinkTable]].
+ *
+ * It uses [[TableFactoryService]] for discovering.
+ */
+object ExternalTableUtil extends Logging {
+
+ /**
+ * Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance
+ *
+ * @param externalCatalogTable the [[ExternalCatalogTable]] instance which to convert
+ * @return converted [[TableSourceTable]] instance from the input catalog table
+ */
+ def fromExternalCatalogTable[T1, T2](
+ tableEnv: TableEnvironment,
+ externalCatalogTable: ExternalCatalogTable)
+ : TableSourceSinkTable[T1, T2] = {
+
+ val properties = new DescriptorProperties()
+ externalCatalogTable.addProperties(properties)
+ val javaMap = properties.asMap
+ val statistics = new FlinkStatistic(externalCatalogTable.getTableStats)
+
+ val source: Option[TableSourceTable[T1]] = tableEnv match {
+ // check for a batch table source in this batch environment
+ case _: BatchTableEnvironment if externalCatalogTable.isBatchTable =>
+ createBatchTableSource(externalCatalogTable, javaMap, statistics)
+
+ // check for a stream table source in this stream environment
+ case _: StreamTableEnvironment if externalCatalogTable.isStreamTable =>
+ createStreamTableSource(externalCatalogTable, javaMap, statistics)
+
+ case _ =>
+ throw new ValidationException(
+ "External catalog table does not support the current environment for a table source.")
+ }
+
+ val sink: Option[TableSinkTable[T2]] = tableEnv match {
+ // check for a batch table sink in this batch environment
+ case _: BatchTableEnvironment if externalCatalogTable.isBatchTable =>
+ createBatchTableSink(externalCatalogTable, javaMap, statistics)
+
+ // check for a stream table sink in this stream environment
+ case _: StreamTableEnvironment if externalCatalogTable.isStreamTable =>
+ createStreamTableSink(externalCatalogTable, javaMap, statistics)
+
+ case _ =>
+ throw new ValidationException(
+ "External catalog table does not support the current environment for a table sink.")
+ }
+
+ new TableSourceSinkTable[T1, T2](source, sink)
+ }
+
+ private def createBatchTableSource[T](
+ externalCatalogTable: ExternalCatalogTable,
+ javaMap: util.Map[String, String],
+ statistics: FlinkStatistic)
+ : Option[TableSourceTable[T]] = if (externalCatalogTable.isTableSource) {
--- End diff --
Is that good enough reason to force us both to write a code that we do not like? Apparently not only we don't like it: https://stackoverflow.com/a/33425307/8149051
After reading your link I get why you shouldn't use return in lambda functions, but he doesn't give a point against using them in methods. At least I do not see it.
However if you are not sure, at least reverse if/else branches. Simpler branch should always go first.
---
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6343#discussion_r202980228
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import java.util
+
+import org.apache.flink.table.api._
+import org.apache.flink.table.descriptors.DescriptorProperties
+import org.apache.flink.table.factories._
+import org.apache.flink.table.plan.schema._
+import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.util.Logging
+
+
+/**
+ * The utility class is used to convert [[ExternalCatalogTable]] to [[TableSourceSinkTable]].
+ *
+ * It uses [[TableFactoryService]] for discovering.
+ */
+object ExternalTableUtil extends Logging {
+
+ /**
+ * Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance
+ *
+ * @param externalCatalogTable the [[ExternalCatalogTable]] instance which to convert
+ * @return converted [[TableSourceTable]] instance from the input catalog table
+ */
+ def fromExternalCatalogTable[T1, T2](
+ tableEnv: TableEnvironment,
+ externalCatalogTable: ExternalCatalogTable)
+ : TableSourceSinkTable[T1, T2] = {
+
+ val properties = new DescriptorProperties()
+ externalCatalogTable.addProperties(properties)
+ val javaMap = properties.asMap
+ val statistics = new FlinkStatistic(externalCatalogTable.getTableStats)
+
+ val source: Option[TableSourceTable[T1]] = tableEnv match {
+ // check for a batch table source in this batch environment
+ case _: BatchTableEnvironment if externalCatalogTable.isBatchTable =>
+ createBatchTableSource(externalCatalogTable, javaMap, statistics)
+
+ // check for a stream table source in this stream environment
+ case _: StreamTableEnvironment if externalCatalogTable.isStreamTable =>
+ createStreamTableSource(externalCatalogTable, javaMap, statistics)
+
+ case _ =>
+ throw new ValidationException(
+ "External catalog table does not support the current environment for a table source.")
+ }
+
+ val sink: Option[TableSinkTable[T2]] = tableEnv match {
+ // check for a batch table sink in this batch environment
+ case _: BatchTableEnvironment if externalCatalogTable.isBatchTable =>
+ createBatchTableSink(externalCatalogTable, javaMap, statistics)
+
+ // check for a stream table sink in this stream environment
+ case _: StreamTableEnvironment if externalCatalogTable.isStreamTable =>
+ createStreamTableSink(externalCatalogTable, javaMap, statistics)
+
+ case _ =>
+ throw new ValidationException(
+ "External catalog table does not support the current environment for a table sink.")
+ }
+
+ new TableSourceSinkTable[T1, T2](source, sink)
+ }
+
+ private def createBatchTableSource[T](
+ externalCatalogTable: ExternalCatalogTable,
+ javaMap: util.Map[String, String],
+ statistics: FlinkStatistic)
+ : Option[TableSourceTable[T]] = if (externalCatalogTable.isTableSource) {
--- End diff --
I also like this pattern more but it is not very Scala-like:
https://tpolecat.github.io/2014/05/09/return.html
---
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6343#discussion_r202989785
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala ---
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{StreamTableEnvironment, ValidationException}
+import org.apache.flink.table.descriptors.StreamTableDescriptorValidator._
+import org.apache.flink.table.factories.{StreamTableSinkFactory, StreamTableSourceFactory, TableFactoryService}
+
+/**
+ * Descriptor for specifying a table source and/or sink in a streaming environment.
+ */
+class StreamTableDescriptor(
--- End diff --
I don't see an alternative if we don't want to have an ugly API.
---
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6343#discussion_r202965060
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableDescriptor.scala ---
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{BatchTableEnvironment, ValidationException}
+import org.apache.flink.table.factories.{BatchTableSinkFactory, BatchTableSourceFactory, TableFactoryService}
+
+/**
+ * Descriptor for specifying a table source and/or sink in a batch environment.
+ */
+class BatchTableDescriptor(
+ private val tableEnv: BatchTableEnvironment,
+ private val connectorDescriptor: ConnectorDescriptor)
+ extends TableDescriptor
+ with SchematicDescriptor
+ with RegistrableDescriptor {
+
+ private var formatDescriptor: Option[FormatDescriptor] = None
+ private var schemaDescriptor: Option[Schema] = None
+
+ /**
+ * Internal method for properties conversion.
+ */
+ override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
+ connectorDescriptor.addProperties(properties)
+ formatDescriptor.foreach(_.addProperties(properties))
+ schemaDescriptor.foreach(_.addProperties(properties))
+ }
+
+ /**
+ * Searches for the specified table source, configures it accordingly, and registers it as
+ * a table under the given name.
+ *
+ * @param name table name to be registered in the table environment
+ */
+ override def registerTableSource(name: String): Unit = {
+ val javaMap = getValidProperties.asMap
+ val tableSource = TableFactoryService
+ .find(classOf[BatchTableSourceFactory[_]], javaMap)
+ .createBatchTableSource(javaMap)
+ tableEnv.registerTableSource(name, tableSource)
+ }
+
+ /**
+ * Searches for the specified table sink, configures it accordingly, and registers it as
+ * a table under the given name.
+ *
+ * @param name table name to be registered in the table environment
+ */
+ override def registerTableSink(name: String): Unit = {
+ val javaMap = getValidProperties.asMap
+ val tableSink = TableFactoryService
+ .find(classOf[BatchTableSinkFactory[_]], javaMap)
+ .createBatchTableSink(javaMap)
+ tableEnv.registerTableSink(name, tableSink)
+ }
+
+ /**
+ * Searches for the specified table source and sink, configures them accordingly, and registers
+ * them as a table under the given name.
+ *
+ * @param name table name to be registered in the table environment
+ */
+ override def registerTableSourceAndSink(name: String): Unit = {
+ registerTableSource(name)
+ registerTableSink(name)
+ }
+
+ /**
+ * Specifies the format that defines how to read data from a connector.
+ */
+ override def withFormat(format: FormatDescriptor): BatchTableDescriptor = {
+ formatDescriptor = Some(format)
+ this
+ }
+
+ /**
+ * Specifies the resulting table schema.
+ */
+ override def withSchema(schema: Schema): BatchTableDescriptor = {
+ schemaDescriptor = Some(schema)
+ this
+ }
+
+ override def toString: String = {
+ getValidProperties.toString
+ }
+
+ // ----------------------------------------------------------------------------------------------
+
+ private def getValidProperties: DescriptorProperties = {
--- End diff --
duplicated code with `StreamTableDescriptor. getValidProperties`
---
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6343#discussion_r202964952
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala ---
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{StreamTableEnvironment, ValidationException}
+import org.apache.flink.table.descriptors.StreamTableDescriptorValidator._
+import org.apache.flink.table.factories.{StreamTableSinkFactory, StreamTableSourceFactory, TableFactoryService}
+
+/**
+ * Descriptor for specifying a table source and/or sink in a streaming environment.
+ */
+class StreamTableDescriptor(
+ private val tableEnv: StreamTableEnvironment,
+ private val connectorDescriptor: ConnectorDescriptor)
+ extends TableDescriptor
+ with SchematicDescriptor
+ with RegistrableDescriptor
+ with StreamableDescriptor {
+
+ private var formatDescriptor: Option[FormatDescriptor] = None
+ private var schemaDescriptor: Option[Schema] = None
+ private var updateMode: Option[String] = None
+
+ /**
+ * Internal method for properties conversion.
+ */
+ override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
+ connectorDescriptor.addProperties(properties)
+ formatDescriptor.foreach(_.addProperties(properties))
+ schemaDescriptor.foreach(_.addProperties(properties))
+ updateMode.foreach(mode => properties.putString(UPDATE_MODE, mode))
+ }
+
+ /**
+ * Searches for the specified table source, configures it accordingly, and registers it as
+ * a table under the given name.
+ *
+ * @param name table name to be registered in the table environment
+ */
+ override def registerTableSource(name: String): Unit = {
+ val javaMap = getValidProperties.asMap
+ val tableSource = TableFactoryService
+ .find(classOf[StreamTableSourceFactory[_]], javaMap)
+ .createStreamTableSource(javaMap)
+ tableEnv.registerTableSource(name, tableSource)
+ }
+
+ /**
+ * Searches for the specified table sink, configures it accordingly, and registers it as
+ * a table under the given name.
+ *
+ * @param name table name to be registered in the table environment
+ */
+ override def registerTableSink(name: String): Unit = {
+ val javaMap = getValidProperties.asMap
+ val tableSink = TableFactoryService
+ .find(classOf[StreamTableSinkFactory[_]], javaMap)
+ .createStreamTableSink(javaMap)
+ tableEnv.registerTableSink(name, tableSink)
+ }
+
+ /**
+ * Searches for the specified table source and sink, configures them accordingly, and registers
+ * them as a table under the given name.
+ *
+ * @param name table name to be registered in the table environment
+ */
+ override def registerTableSourceAndSink(name: String): Unit = {
+ registerTableSource(name)
+ registerTableSink(name)
+ }
+
+ /**
+ * Specifies the format that defines how to read data from a connector.
+ */
+ override def withFormat(format: FormatDescriptor): StreamTableDescriptor = {
+ formatDescriptor = Some(format)
+ this
+ }
+
+ /**
+ * Specifies the resulting table schema.
+ */
+ override def withSchema(schema: Schema): StreamTableDescriptor = {
+ schemaDescriptor = Some(schema)
+ this
+ }
+
+ /**
+ * Declares how to perform the conversion between a dynamic table and an external connector.
+ *
+ * In append mode, a dynamic table and an external connector only exchange INSERT messages.
+ *
+ * @see See also [[inRetractMode()]] and [[inUpsertMode()]].
+ */
+ override def inAppendMode(): StreamTableDescriptor = {
+ updateMode = Some(UPDATE_MODE_VALUE_APPEND)
+ this
+ }
+
+ /**
+ * Declares how to perform the conversion between a dynamic table and an external connector.
+ *
+ * In retract mode, a dynamic table and an external connector exchange ADD and RETRACT messages.
+ *
+ * An INSERT change is encoded as an ADD message, a DELETE change as a RETRACT message, and an
+ * UPDATE change as a RETRACT message for the updated (previous) row and an ADD message for
+ * the updating (new) row.
+ *
+ * In this mode, a key must not be defined as opposed to upsert mode. However, every update
+ * consists of two messages which is less efficient.
+ *
+ * @see See also [[inAppendMode()]] and [[inUpsertMode()]].
+ */
+ override def inRetractMode(): StreamTableDescriptor = {
+ updateMode = Some(UPDATE_MODE_VALUE_RETRACT)
+ this
+ }
+
+ /**
+ * Declares how to perform the conversion between a dynamic table and an external connector.
+ *
+ * In upsert mode, a dynamic table and an external connector exchange UPSERT and DELETE messages.
+ *
+ * This mode requires a (possibly composite) unique key by which updates can be propagated. The
+ * external connector needs to be aware of the unique key attribute in order to apply messages
+ * correctly. INSERT and UPDATE changes are encoded as UPSERT messages. DELETE changes as
+ * DELETE messages.
+ *
+ * The main difference to a retract stream is that UPDATE changes are encoded with a single
+ * message and are therefore more efficient.
+ *
+ * @see See also [[inAppendMode()]] and [[inRetractMode()]].
+ */
+ override def inUpsertMode(): StreamTableDescriptor = {
+ updateMode = Some(UPDATE_MODE_VALUE_UPSERT)
+ this
+ }
+
+ override def toString: String = {
+ getValidProperties.toString
+ }
+
+ // ----------------------------------------------------------------------------------------------
+
+ private def getValidProperties: DescriptorProperties = {
+ val properties = new DescriptorProperties()
+ addProperties(properties)
+
+ // check for a format
+ if (connectorDescriptor.needsFormat() && formatDescriptor.isEmpty) {
+ throw new ValidationException(
+ s"The connector '$connectorDescriptor' requires a format description.")
+ } else if (!connectorDescriptor.needsFormat() && formatDescriptor.isDefined) {
+ throw new ValidationException(
+ s"The connector '$connectorDescriptor' does not require a format description " +
+ s"but '${formatDescriptor.get}' found.")
+ }
+
+ // basic validation
--- End diff --
drop the comment
---
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6343#discussion_r203301509
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala ---
@@ -18,33 +18,299 @@
package org.apache.flink.table.catalog
+import org.apache.flink.table.descriptors.DescriptorProperties.toScala
+import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats}
+import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT}
import org.apache.flink.table.descriptors._
import org.apache.flink.table.plan.stats.TableStats
+import scala.collection.JavaConverters._
+
/**
- * Defines a table in an [[ExternalCatalog]].
+ * Defines a table in an [[ExternalCatalog]]. External catalog tables describe table sources
+ * and/or sinks for both batch and stream environments.
+ *
+ * The catalog table takes descriptors which allow for declaring the communication to external
+ * systems in an implementation-agnostic way. The classpath is scanned for suitable table factories
+ * that match the desired configuration.
+ *
+ * Use the provided builder methods to configure the external catalog table accordingly.
+ *
+ * The following example shows how to read from a connector using a JSON format and
+ * declaring it as a table source:
*
- * @param connectorDesc describes the system to connect to
- * @param formatDesc describes the data format of a connector
- * @param schemaDesc describes the schema of the result table
- * @param statisticsDesc describes the estimated statistics of the result table
- * @param metadataDesc describes additional metadata of a table
+ * {{{
+ * ExternalCatalogTable(
+ * new ExternalSystemXYZ()
+ * .version("0.11"))
+ * .withFormat(
+ * new Json()
+ * .jsonSchema("{...}")
+ * .failOnMissingField(false))
+ * .withSchema(
+ * new Schema()
+ * .field("user-name", "VARCHAR").from("u_name")
+ * .field("count", "DECIMAL")
+ * .asTableSource()
+ * }}}
+ *
+ * Note: For backwards-compatibility, the table is declared as a table source for batch and
+ * streaming environment by default.
+ *
+ * See also [[org.apache.flink.table.factories.TableFactory]] for more information about how
+ * to target suitable factories.
+ *
+ * @param connectorDescriptor describes the system to connect to
*/
-class ExternalCatalogTable(
- connectorDesc: ConnectorDescriptor,
- formatDesc: Option[FormatDescriptor],
- schemaDesc: Option[Schema],
- statisticsDesc: Option[Statistics],
- metadataDesc: Option[Metadata])
- extends TableSourceDescriptor {
-
- this.connectorDescriptor = Some(connectorDesc)
- this.formatDescriptor = formatDesc
- this.schemaDescriptor = schemaDesc
- this.statisticsDescriptor = statisticsDesc
- this.metaDescriptor = metadataDesc
-
- // expose statistics for external table source util
- override def getTableStats: Option[TableStats] = super.getTableStats
+class ExternalCatalogTable(val connectorDescriptor: ConnectorDescriptor)
+ extends TableDescriptor
--- End diff --
is it consistent (or at least mostly consistent) with our code base?
---
[GitHub] flink issue #6343: [FLINK-9852] [table] Expose descriptor-based sink creatio...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:
https://github.com/apache/flink/pull/6343
Thank you for your suggestions @pnowojski. I hope I could address most of them in the new commits. I improved the code duplication a bit and `ExternalCatalogTable` is immutable now with a proper builder.
---
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6343#discussion_r203349277
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala ---
@@ -18,33 +18,299 @@
package org.apache.flink.table.catalog
+import org.apache.flink.table.descriptors.DescriptorProperties.toScala
+import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats}
+import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT}
import org.apache.flink.table.descriptors._
import org.apache.flink.table.plan.stats.TableStats
+import scala.collection.JavaConverters._
+
/**
- * Defines a table in an [[ExternalCatalog]].
+ * Defines a table in an [[ExternalCatalog]]. External catalog tables describe table sources
+ * and/or sinks for both batch and stream environments.
+ *
+ * The catalog table takes descriptors which allow for declaring the communication to external
+ * systems in an implementation-agnostic way. The classpath is scanned for suitable table factories
+ * that match the desired configuration.
+ *
+ * Use the provided builder methods to configure the external catalog table accordingly.
+ *
+ * The following example shows how to read from a connector using a JSON format and
+ * declaring it as a table source:
*
- * @param connectorDesc describes the system to connect to
- * @param formatDesc describes the data format of a connector
- * @param schemaDesc describes the schema of the result table
- * @param statisticsDesc describes the estimated statistics of the result table
- * @param metadataDesc describes additional metadata of a table
+ * {{{
+ * ExternalCatalogTable(
+ * new ExternalSystemXYZ()
+ * .version("0.11"))
+ * .withFormat(
+ * new Json()
+ * .jsonSchema("{...}")
+ * .failOnMissingField(false))
+ * .withSchema(
+ * new Schema()
+ * .field("user-name", "VARCHAR").from("u_name")
+ * .field("count", "DECIMAL")
+ * .asTableSource()
+ * }}}
+ *
+ * Note: For backwards-compatibility, the table is declared as a table source for batch and
+ * streaming environment by default.
+ *
+ * See also [[org.apache.flink.table.factories.TableFactory]] for more information about how
+ * to target suitable factories.
+ *
+ * @param connectorDescriptor describes the system to connect to
*/
-class ExternalCatalogTable(
- connectorDesc: ConnectorDescriptor,
- formatDesc: Option[FormatDescriptor],
- schemaDesc: Option[Schema],
- statisticsDesc: Option[Statistics],
- metadataDesc: Option[Metadata])
- extends TableSourceDescriptor {
-
- this.connectorDescriptor = Some(connectorDesc)
- this.formatDescriptor = formatDesc
- this.schemaDescriptor = schemaDesc
- this.statisticsDescriptor = statisticsDesc
- this.metaDescriptor = metadataDesc
-
- // expose statistics for external table source util
- override def getTableStats: Option[TableStats] = super.getTableStats
+class ExternalCatalogTable(val connectorDescriptor: ConnectorDescriptor)
+ extends TableDescriptor
--- End diff --
This is the code style that we should all comply with.
---
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6343#discussion_r203301285
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala ---
@@ -18,33 +18,299 @@
package org.apache.flink.table.catalog
+import org.apache.flink.table.descriptors.DescriptorProperties.toScala
+import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats}
+import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT}
import org.apache.flink.table.descriptors._
import org.apache.flink.table.plan.stats.TableStats
+import scala.collection.JavaConverters._
+
/**
- * Defines a table in an [[ExternalCatalog]].
+ * Defines a table in an [[ExternalCatalog]]. External catalog tables describe table sources
+ * and/or sinks for both batch and stream environments.
+ *
+ * The catalog table takes descriptors which allow for declaring the communication to external
+ * systems in an implementation-agnostic way. The classpath is scanned for suitable table factories
+ * that match the desired configuration.
+ *
+ * Use the provided builder methods to configure the external catalog table accordingly.
+ *
+ * The following example shows how to read from a connector using a JSON format and
+ * declaring it as a table source:
*
- * @param connectorDesc describes the system to connect to
- * @param formatDesc describes the data format of a connector
- * @param schemaDesc describes the schema of the result table
- * @param statisticsDesc describes the estimated statistics of the result table
- * @param metadataDesc describes additional metadata of a table
+ * {{{
+ * ExternalCatalogTable(
+ * new ExternalSystemXYZ()
+ * .version("0.11"))
+ * .withFormat(
+ * new Json()
+ * .jsonSchema("{...}")
+ * .failOnMissingField(false))
+ * .withSchema(
+ * new Schema()
+ * .field("user-name", "VARCHAR").from("u_name")
+ * .field("count", "DECIMAL")
+ * .asTableSource()
--- End diff --
building could be a time/place to validate the builder state, plus provide a Immutable built class, with final fields (which is huge benefit on it's own that simplifies reasoning about the code).
https://stackoverflow.com/a/5652870/8149051
https://www.linkedin.com/pulse/20140528113353-16837833-6-benefits-of-programming-with-immutable-objects-in-java/
> “Classes should be immutable unless there’s a very good reason to make them mutable. If a class cannot be made immutable, limit its mutability as much as possible.”
Another issue/questions. Does it make sense to create `ExternalCatalogTable` without defined format/schema/statistics/metadata/updateMode? If not - fail early in `build` method here. `build` method could be `asTableSource/Sink`.
---
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6343#discussion_r202965391
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala ---
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{StreamTableEnvironment, ValidationException}
+import org.apache.flink.table.descriptors.StreamTableDescriptorValidator._
+import org.apache.flink.table.factories.{StreamTableSinkFactory, StreamTableSourceFactory, TableFactoryService}
+
+/**
+ * Descriptor for specifying a table source and/or sink in a streaming environment.
+ */
+class StreamTableDescriptor(
--- End diff --
This class duplicates code with `BatchTableDescriptor`
---
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/6343
---
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6343#discussion_r202963234
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import java.util
+
+import org.apache.flink.table.api._
+import org.apache.flink.table.descriptors.DescriptorProperties
+import org.apache.flink.table.factories._
+import org.apache.flink.table.plan.schema._
+import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.util.Logging
+
+
+/**
+ * The utility class is used to convert [[ExternalCatalogTable]] to [[TableSourceSinkTable]].
+ *
+ * It uses [[TableFactoryService]] for discovering.
+ */
+object ExternalTableUtil extends Logging {
+
+ /**
+ * Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance
+ *
+ * @param externalCatalogTable the [[ExternalCatalogTable]] instance which to convert
+ * @return converted [[TableSourceTable]] instance from the input catalog table
+ */
+ def fromExternalCatalogTable[T1, T2](
+ tableEnv: TableEnvironment,
+ externalCatalogTable: ExternalCatalogTable)
+ : TableSourceSinkTable[T1, T2] = {
+
+ val properties = new DescriptorProperties()
+ externalCatalogTable.addProperties(properties)
+ val javaMap = properties.asMap
+ val statistics = new FlinkStatistic(externalCatalogTable.getTableStats)
+
+ val source: Option[TableSourceTable[T1]] = tableEnv match {
+ // check for a batch table source in this batch environment
+ case _: BatchTableEnvironment if externalCatalogTable.isBatchTable =>
+ createBatchTableSource(externalCatalogTable, javaMap, statistics)
+
+ // check for a stream table source in this stream environment
+ case _: StreamTableEnvironment if externalCatalogTable.isStreamTable =>
+ createStreamTableSource(externalCatalogTable, javaMap, statistics)
+
+ case _ =>
+ throw new ValidationException(
+ "External catalog table does not support the current environment for a table source.")
+ }
+
+ val sink: Option[TableSinkTable[T2]] = tableEnv match {
+ // check for a batch table sink in this batch environment
+ case _: BatchTableEnvironment if externalCatalogTable.isBatchTable =>
+ createBatchTableSink(externalCatalogTable, javaMap, statistics)
+
+ // check for a stream table sink in this stream environment
+ case _: StreamTableEnvironment if externalCatalogTable.isStreamTable =>
+ createStreamTableSink(externalCatalogTable, javaMap, statistics)
+
+ case _ =>
+ throw new ValidationException(
+ "External catalog table does not support the current environment for a table sink.")
+ }
+
+ new TableSourceSinkTable[T1, T2](source, sink)
+ }
+
+ private def createBatchTableSource[T](
+ externalCatalogTable: ExternalCatalogTable,
+ javaMap: util.Map[String, String],
+ statistics: FlinkStatistic)
+ : Option[TableSourceTable[T]] = if (externalCatalogTable.isTableSource) {
--- End diff --
reverse if/else branches - simpler case should be first
also if you change it to
```
if (!externalCatalogTable.isTableSource) {
return None
}
val source = TableFactoryService
.find(classOf[BatchTableSourceFactory[T]], javaMap)
.createBatchTableSource(javaMap)
val table = new BatchTableSourceTable(
source,
statistics)
Some(table)
```
it would even further simplify the code (reader wouldn't have to track one extra level of nesting)
ditto in other places
---
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6343#discussion_r202982848
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala ---
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{StreamTableEnvironment, ValidationException}
+import org.apache.flink.table.descriptors.StreamTableDescriptorValidator._
+import org.apache.flink.table.factories.{StreamTableSinkFactory, StreamTableSourceFactory, TableFactoryService}
+
+/**
+ * Descriptor for specifying a table source and/or sink in a streaming environment.
+ */
+class StreamTableDescriptor(
+ private val tableEnv: StreamTableEnvironment,
+ private val connectorDescriptor: ConnectorDescriptor)
+ extends TableDescriptor
+ with SchematicDescriptor
+ with RegistrableDescriptor
+ with StreamableDescriptor {
+
+ private var formatDescriptor: Option[FormatDescriptor] = None
+ private var schemaDescriptor: Option[Schema] = None
+ private var updateMode: Option[String] = None
+
+ /**
+ * Internal method for properties conversion.
+ */
+ override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
+ connectorDescriptor.addProperties(properties)
+ formatDescriptor.foreach(_.addProperties(properties))
+ schemaDescriptor.foreach(_.addProperties(properties))
+ updateMode.foreach(mode => properties.putString(UPDATE_MODE, mode))
+ }
+
+ /**
+ * Searches for the specified table source, configures it accordingly, and registers it as
+ * a table under the given name.
+ *
+ * @param name table name to be registered in the table environment
+ */
+ override def registerTableSource(name: String): Unit = {
+ val javaMap = getValidProperties.asMap
+ val tableSource = TableFactoryService
+ .find(classOf[StreamTableSourceFactory[_]], javaMap)
+ .createStreamTableSource(javaMap)
+ tableEnv.registerTableSource(name, tableSource)
+ }
+
+ /**
+ * Searches for the specified table sink, configures it accordingly, and registers it as
+ * a table under the given name.
+ *
+ * @param name table name to be registered in the table environment
+ */
+ override def registerTableSink(name: String): Unit = {
+ val javaMap = getValidProperties.asMap
+ val tableSink = TableFactoryService
+ .find(classOf[StreamTableSinkFactory[_]], javaMap)
+ .createStreamTableSink(javaMap)
+ tableEnv.registerTableSink(name, tableSink)
+ }
+
+ /**
+ * Searches for the specified table source and sink, configures them accordingly, and registers
+ * them as a table under the given name.
+ *
+ * @param name table name to be registered in the table environment
+ */
+ override def registerTableSourceAndSink(name: String): Unit = {
+ registerTableSource(name)
+ registerTableSink(name)
+ }
+
+ /**
+ * Specifies the format that defines how to read data from a connector.
+ */
+ override def withFormat(format: FormatDescriptor): StreamTableDescriptor = {
+ formatDescriptor = Some(format)
+ this
+ }
+
+ /**
+ * Specifies the resulting table schema.
+ */
+ override def withSchema(schema: Schema): StreamTableDescriptor = {
+ schemaDescriptor = Some(schema)
+ this
+ }
+
+ /**
+ * Declares how to perform the conversion between a dynamic table and an external connector.
+ *
+ * In append mode, a dynamic table and an external connector only exchange INSERT messages.
+ *
+ * @see See also [[inRetractMode()]] and [[inUpsertMode()]].
+ */
+ override def inAppendMode(): StreamTableDescriptor = {
+ updateMode = Some(UPDATE_MODE_VALUE_APPEND)
+ this
+ }
+
+ /**
+ * Declares how to perform the conversion between a dynamic table and an external connector.
+ *
+ * In retract mode, a dynamic table and an external connector exchange ADD and RETRACT messages.
+ *
+ * An INSERT change is encoded as an ADD message, a DELETE change as a RETRACT message, and an
+ * UPDATE change as a RETRACT message for the updated (previous) row and an ADD message for
+ * the updating (new) row.
+ *
+ * In this mode, a key must not be defined as opposed to upsert mode. However, every update
+ * consists of two messages which is less efficient.
+ *
+ * @see See also [[inAppendMode()]] and [[inUpsertMode()]].
+ */
+ override def inRetractMode(): StreamTableDescriptor = {
+ updateMode = Some(UPDATE_MODE_VALUE_RETRACT)
+ this
+ }
+
+ /**
+ * Declares how to perform the conversion between a dynamic table and an external connector.
+ *
+ * In upsert mode, a dynamic table and an external connector exchange UPSERT and DELETE messages.
+ *
+ * This mode requires a (possibly composite) unique key by which updates can be propagated. The
+ * external connector needs to be aware of the unique key attribute in order to apply messages
+ * correctly. INSERT and UPDATE changes are encoded as UPSERT messages. DELETE changes as
+ * DELETE messages.
+ *
+ * The main difference to a retract stream is that UPDATE changes are encoded with a single
+ * message and are therefore more efficient.
+ *
+ * @see See also [[inAppendMode()]] and [[inRetractMode()]].
+ */
+ override def inUpsertMode(): StreamTableDescriptor = {
+ updateMode = Some(UPDATE_MODE_VALUE_UPSERT)
+ this
+ }
+
+ override def toString: String = {
+ getValidProperties.toString
+ }
+
+ // ----------------------------------------------------------------------------------------------
+
+ private def getValidProperties: DescriptorProperties = {
+ val properties = new DescriptorProperties()
+ addProperties(properties)
+
+ // check for a format
+ if (connectorDescriptor.needsFormat() && formatDescriptor.isEmpty) {
+ throw new ValidationException(
+ s"The connector '$connectorDescriptor' requires a format description.")
+ } else if (!connectorDescriptor.needsFormat() && formatDescriptor.isDefined) {
+ throw new ValidationException(
+ s"The connector '$connectorDescriptor' does not require a format description " +
+ s"but '${formatDescriptor.get}' found.")
+ }
+
+ // basic validation
--- End diff --
I extended the comment.
---
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6343#discussion_r203348914
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala ---
@@ -18,33 +18,299 @@
package org.apache.flink.table.catalog
+import org.apache.flink.table.descriptors.DescriptorProperties.toScala
+import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats}
+import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT}
import org.apache.flink.table.descriptors._
import org.apache.flink.table.plan.stats.TableStats
+import scala.collection.JavaConverters._
+
/**
- * Defines a table in an [[ExternalCatalog]].
+ * Defines a table in an [[ExternalCatalog]]. External catalog tables describe table sources
+ * and/or sinks for both batch and stream environments.
+ *
+ * The catalog table takes descriptors which allow for declaring the communication to external
+ * systems in an implementation-agnostic way. The classpath is scanned for suitable table factories
+ * that match the desired configuration.
+ *
+ * Use the provided builder methods to configure the external catalog table accordingly.
+ *
+ * The following example shows how to read from a connector using a JSON format and
+ * declaring it as a table source:
*
- * @param connectorDesc describes the system to connect to
- * @param formatDesc describes the data format of a connector
- * @param schemaDesc describes the schema of the result table
- * @param statisticsDesc describes the estimated statistics of the result table
- * @param metadataDesc describes additional metadata of a table
+ * {{{
+ * ExternalCatalogTable(
+ * new ExternalSystemXYZ()
+ * .version("0.11"))
+ * .withFormat(
+ * new Json()
+ * .jsonSchema("{...}")
+ * .failOnMissingField(false))
+ * .withSchema(
+ * new Schema()
+ * .field("user-name", "VARCHAR").from("u_name")
+ * .field("count", "DECIMAL")
+ * .asTableSource()
+ * }}}
+ *
+ * Note: For backwards-compatibility, the table is declared as a table source for batch and
+ * streaming environment by default.
+ *
+ * See also [[org.apache.flink.table.factories.TableFactory]] for more information about how
+ * to target suitable factories.
+ *
+ * @param connectorDescriptor describes the system to connect to
*/
-class ExternalCatalogTable(
- connectorDesc: ConnectorDescriptor,
- formatDesc: Option[FormatDescriptor],
- schemaDesc: Option[Schema],
- statisticsDesc: Option[Statistics],
- metadataDesc: Option[Metadata])
- extends TableSourceDescriptor {
-
- this.connectorDescriptor = Some(connectorDesc)
- this.formatDescriptor = formatDesc
- this.schemaDescriptor = schemaDesc
- this.statisticsDescriptor = statisticsDesc
- this.metaDescriptor = metadataDesc
-
- // expose statistics for external table source util
- override def getTableStats: Option[TableStats] = super.getTableStats
+class ExternalCatalogTable(val connectorDescriptor: ConnectorDescriptor)
+ extends TableDescriptor
--- End diff --
I would say yes. See also `org.apache.flink.runtime.jobmanager.JobManager` as an example.
---
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6343#discussion_r202990315
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvAppendTableSinkFactory.scala ---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.util
+
+import org.apache.flink.table.descriptors.SchemaValidator.{SCHEMA, SCHEMA_PROCTIME}
+import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND}
+import org.apache.flink.table.factories.StreamTableSinkFactory
+import org.apache.flink.types.Row
+
+/**
+ * Factory base for creating configured instances of [[CsvTableSink]] in a stream environment.
+ */
+class CsvAppendTableSinkFactory
--- End diff --
Will put it into a separate commit while merging.
---
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6343#discussion_r203341010
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala ---
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{StreamTableEnvironment, ValidationException}
+import org.apache.flink.table.descriptors.StreamTableDescriptorValidator._
+import org.apache.flink.table.factories.{StreamTableSinkFactory, StreamTableSourceFactory, TableFactoryService}
+
+/**
+ * Descriptor for specifying a table source and/or sink in a streaming environment.
+ */
+class StreamTableDescriptor(
--- End diff --
Inherit:
```
class StreamTableDescriptor(...) extends BatchTableDescriptor with StreamableDescriptor
```
---
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6343#discussion_r203304035
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableDescriptor.scala ---
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{BatchTableEnvironment, ValidationException}
+import org.apache.flink.table.factories.{BatchTableSinkFactory, BatchTableSourceFactory, TableFactoryService}
+
+/**
+ * Descriptor for specifying a table source and/or sink in a batch environment.
+ */
+class BatchTableDescriptor(
+ private val tableEnv: BatchTableEnvironment,
+ private val connectorDescriptor: ConnectorDescriptor)
+ extends TableDescriptor
+ with SchematicDescriptor
+ with RegistrableDescriptor {
+
+ private var formatDescriptor: Option[FormatDescriptor] = None
+ private var schemaDescriptor: Option[Schema] = None
+
+ /**
+ * Internal method for properties conversion.
+ */
+ override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
--- End diff --
It's not about public/private :) It's about an order in which you should be reading the code.
---
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6343#discussion_r203373313
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import java.util
+
+import org.apache.flink.table.api._
+import org.apache.flink.table.descriptors.DescriptorProperties
+import org.apache.flink.table.factories._
+import org.apache.flink.table.plan.schema._
+import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.util.Logging
+
+
+/**
+ * The utility class is used to convert [[ExternalCatalogTable]] to [[TableSourceSinkTable]].
+ *
+ * It uses [[TableFactoryService]] for discovering.
+ */
+object ExternalTableUtil extends Logging {
+
+ /**
+ * Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance
+ *
+ * @param externalCatalogTable the [[ExternalCatalogTable]] instance which to convert
+ * @return converted [[TableSourceTable]] instance from the input catalog table
+ */
+ def fromExternalCatalogTable[T1, T2](
+ tableEnv: TableEnvironment,
+ externalCatalogTable: ExternalCatalogTable)
+ : TableSourceSinkTable[T1, T2] = {
+
+ val properties = new DescriptorProperties()
+ externalCatalogTable.addProperties(properties)
+ val javaMap = properties.asMap
+ val statistics = new FlinkStatistic(externalCatalogTable.getTableStats)
+
+ val source: Option[TableSourceTable[T1]] = tableEnv match {
+ // check for a batch table source in this batch environment
+ case _: BatchTableEnvironment if externalCatalogTable.isBatchTable =>
+ createBatchTableSource(externalCatalogTable, javaMap, statistics)
+
+ // check for a stream table source in this stream environment
+ case _: StreamTableEnvironment if externalCatalogTable.isStreamTable =>
+ createStreamTableSource(externalCatalogTable, javaMap, statistics)
+
+ case _ =>
+ throw new ValidationException(
+ "External catalog table does not support the current environment for a table source.")
+ }
+
+ val sink: Option[TableSinkTable[T2]] = tableEnv match {
+ // check for a batch table sink in this batch environment
+ case _: BatchTableEnvironment if externalCatalogTable.isBatchTable =>
+ createBatchTableSink(externalCatalogTable, javaMap, statistics)
+
+ // check for a stream table sink in this stream environment
+ case _: StreamTableEnvironment if externalCatalogTable.isStreamTable =>
+ createStreamTableSink(externalCatalogTable, javaMap, statistics)
+
+ case _ =>
+ throw new ValidationException(
+ "External catalog table does not support the current environment for a table sink.")
+ }
+
+ new TableSourceSinkTable[T1, T2](source, sink)
+ }
+
+ private def createBatchTableSource[T](
+ externalCatalogTable: ExternalCatalogTable,
+ javaMap: util.Map[String, String],
+ statistics: FlinkStatistic)
+ : Option[TableSourceTable[T]] = if (externalCatalogTable.isTableSource) {
--- End diff --
I will reverse the branches.
---
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6343#discussion_r202977464
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala ---
@@ -18,33 +18,299 @@
package org.apache.flink.table.catalog
+import org.apache.flink.table.descriptors.DescriptorProperties.toScala
+import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats}
+import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT}
import org.apache.flink.table.descriptors._
import org.apache.flink.table.plan.stats.TableStats
+import scala.collection.JavaConverters._
+
/**
- * Defines a table in an [[ExternalCatalog]].
+ * Defines a table in an [[ExternalCatalog]]. External catalog tables describe table sources
+ * and/or sinks for both batch and stream environments.
+ *
+ * The catalog table takes descriptors which allow for declaring the communication to external
+ * systems in an implementation-agnostic way. The classpath is scanned for suitable table factories
+ * that match the desired configuration.
+ *
+ * Use the provided builder methods to configure the external catalog table accordingly.
+ *
+ * The following example shows how to read from a connector using a JSON format and
+ * declaring it as a table source:
*
- * @param connectorDesc describes the system to connect to
- * @param formatDesc describes the data format of a connector
- * @param schemaDesc describes the schema of the result table
- * @param statisticsDesc describes the estimated statistics of the result table
- * @param metadataDesc describes additional metadata of a table
+ * {{{
+ * ExternalCatalogTable(
+ * new ExternalSystemXYZ()
+ * .version("0.11"))
+ * .withFormat(
+ * new Json()
+ * .jsonSchema("{...}")
+ * .failOnMissingField(false))
+ * .withSchema(
+ * new Schema()
+ * .field("user-name", "VARCHAR").from("u_name")
+ * .field("count", "DECIMAL")
+ * .asTableSource()
+ * }}}
+ *
+ * Note: For backwards-compatibility, the table is declared as a table source for batch and
+ * streaming environment by default.
+ *
+ * See also [[org.apache.flink.table.factories.TableFactory]] for more information about how
+ * to target suitable factories.
+ *
+ * @param connectorDescriptor describes the system to connect to
*/
-class ExternalCatalogTable(
- connectorDesc: ConnectorDescriptor,
- formatDesc: Option[FormatDescriptor],
- schemaDesc: Option[Schema],
- statisticsDesc: Option[Statistics],
- metadataDesc: Option[Metadata])
- extends TableSourceDescriptor {
-
- this.connectorDescriptor = Some(connectorDesc)
- this.formatDescriptor = formatDesc
- this.schemaDescriptor = schemaDesc
- this.statisticsDescriptor = statisticsDesc
- this.metaDescriptor = metadataDesc
-
- // expose statistics for external table source util
- override def getTableStats: Option[TableStats] = super.getTableStats
+class ExternalCatalogTable(val connectorDescriptor: ConnectorDescriptor)
+ extends TableDescriptor
--- End diff --
https://github.com/databricks/scala-style-guide#indent
---
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6343#discussion_r202966706
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableDescriptor.scala ---
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{BatchTableEnvironment, ValidationException}
+import org.apache.flink.table.factories.{BatchTableSinkFactory, BatchTableSourceFactory, TableFactoryService}
+
+/**
+ * Descriptor for specifying a table source and/or sink in a batch environment.
+ */
+class BatchTableDescriptor(
+ private val tableEnv: BatchTableEnvironment,
+ private val connectorDescriptor: ConnectorDescriptor)
+ extends TableDescriptor
+ with SchematicDescriptor
+ with RegistrableDescriptor {
+
+ private var formatDescriptor: Option[FormatDescriptor] = None
+ private var schemaDescriptor: Option[Schema] = None
+
+ /**
+ * Internal method for properties conversion.
+ */
+ override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
--- End diff --
in many places (for example all `addProperties` methods) you are ordering methods very weirdly. Rule of thumb should be pubic methods before private. Longer story: https://stackoverflow.com/a/1760877/8149051
Many times in this code review I had to jump up & down.
---
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6343#discussion_r202915534
--- Diff: flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java ---
@@ -40,10 +41,18 @@
@Test
public void testMerging() throws Exception {
- final Environment env1 = EnvironmentFileUtil.parseUnmodified(DEFAULTS_ENVIRONMENT_FILE);
+ final Map<String, String> replaceVars1 = new HashMap<>();
+ replaceVars1.put("$VAR_UPDATE_MODE", "update-mode: append");
+ final Environment env1 = EnvironmentFileUtil.parseModified(
+ DEFAULTS_ENVIRONMENT_FILE,
+ replaceVars1);
+
+ final Map<String, String> replaceVars2 = new HashMap<>();
--- End diff --
```
final Map<String, String> replaceVars2 = new HashMap<>(replaceVars1);
```
and you can drop the line below
---
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6343#discussion_r202964715
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchematicDescriptor.scala ---
@@ -19,14 +19,17 @@
package org.apache.flink.table.descriptors
/**
- * Common class for all descriptors describing a table sink.
+ * A trait for descriptors that allow to define a format and schema.
*/
-abstract class TableSinkDescriptor extends TableDescriptor {
+trait SchematicDescriptor extends Descriptor {
--- End diff --
are you using it anywhere as interface? what does extracting it to separate interface give us? Maybe drop it?
---
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6343#discussion_r202923853
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala ---
@@ -18,33 +18,299 @@
package org.apache.flink.table.catalog
+import org.apache.flink.table.descriptors.DescriptorProperties.toScala
+import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats}
+import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT}
import org.apache.flink.table.descriptors._
import org.apache.flink.table.plan.stats.TableStats
+import scala.collection.JavaConverters._
+
/**
- * Defines a table in an [[ExternalCatalog]].
+ * Defines a table in an [[ExternalCatalog]]. External catalog tables describe table sources
+ * and/or sinks for both batch and stream environments.
+ *
+ * The catalog table takes descriptors which allow for declaring the communication to external
+ * systems in an implementation-agnostic way. The classpath is scanned for suitable table factories
+ * that match the desired configuration.
+ *
+ * Use the provided builder methods to configure the external catalog table accordingly.
+ *
+ * The following example shows how to read from a connector using a JSON format and
+ * declaring it as a table source:
*
- * @param connectorDesc describes the system to connect to
- * @param formatDesc describes the data format of a connector
- * @param schemaDesc describes the schema of the result table
- * @param statisticsDesc describes the estimated statistics of the result table
- * @param metadataDesc describes additional metadata of a table
+ * {{{
+ * ExternalCatalogTable(
+ * new ExternalSystemXYZ()
+ * .version("0.11"))
+ * .withFormat(
+ * new Json()
+ * .jsonSchema("{...}")
+ * .failOnMissingField(false))
+ * .withSchema(
+ * new Schema()
+ * .field("user-name", "VARCHAR").from("u_name")
+ * .field("count", "DECIMAL")
+ * .asTableSource()
+ * }}}
+ *
+ * Note: For backwards-compatibility, the table is declared as a table source for batch and
+ * streaming environment by default.
+ *
+ * See also [[org.apache.flink.table.factories.TableFactory]] for more information about how
+ * to target suitable factories.
+ *
+ * @param connectorDescriptor describes the system to connect to
*/
-class ExternalCatalogTable(
- connectorDesc: ConnectorDescriptor,
- formatDesc: Option[FormatDescriptor],
- schemaDesc: Option[Schema],
- statisticsDesc: Option[Statistics],
- metadataDesc: Option[Metadata])
- extends TableSourceDescriptor {
-
- this.connectorDescriptor = Some(connectorDesc)
- this.formatDescriptor = formatDesc
- this.schemaDescriptor = schemaDesc
- this.statisticsDescriptor = statisticsDesc
- this.metaDescriptor = metadataDesc
-
- // expose statistics for external table source util
- override def getTableStats: Option[TableStats] = super.getTableStats
+class ExternalCatalogTable(val connectorDescriptor: ConnectorDescriptor)
+ extends TableDescriptor
--- End diff --
single tab? this is inconsistent with other places
---
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6343#discussion_r203373202
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala ---
@@ -18,33 +18,299 @@
package org.apache.flink.table.catalog
+import org.apache.flink.table.descriptors.DescriptorProperties.toScala
+import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats}
+import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT}
import org.apache.flink.table.descriptors._
import org.apache.flink.table.plan.stats.TableStats
+import scala.collection.JavaConverters._
+
/**
- * Defines a table in an [[ExternalCatalog]].
+ * Defines a table in an [[ExternalCatalog]]. External catalog tables describe table sources
+ * and/or sinks for both batch and stream environments.
+ *
+ * The catalog table takes descriptors which allow for declaring the communication to external
+ * systems in an implementation-agnostic way. The classpath is scanned for suitable table factories
+ * that match the desired configuration.
+ *
+ * Use the provided builder methods to configure the external catalog table accordingly.
+ *
+ * The following example shows how to read from a connector using a JSON format and
+ * declaring it as a table source:
*
- * @param connectorDesc describes the system to connect to
- * @param formatDesc describes the data format of a connector
- * @param schemaDesc describes the schema of the result table
- * @param statisticsDesc describes the estimated statistics of the result table
- * @param metadataDesc describes additional metadata of a table
+ * {{{
+ * ExternalCatalogTable(
+ * new ExternalSystemXYZ()
+ * .version("0.11"))
+ * .withFormat(
+ * new Json()
+ * .jsonSchema("{...}")
+ * .failOnMissingField(false))
+ * .withSchema(
+ * new Schema()
+ * .field("user-name", "VARCHAR").from("u_name")
+ * .field("count", "DECIMAL")
+ * .asTableSource()
--- End diff --
A descriptor is basically a builder. That builds immutable string properties ones the `addProperties` is called.
"format/schema/statistics/metadata/updateMode" are only concepts that we propose to a user. In the end, a user can implement a custom connector descriptor and paste whatever properties he would like to have and implement a custom factory that match to whatever properties are needed.
---
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6343#discussion_r202983790
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableDescriptor.scala ---
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{BatchTableEnvironment, ValidationException}
+import org.apache.flink.table.factories.{BatchTableSinkFactory, BatchTableSourceFactory, TableFactoryService}
+
+/**
+ * Descriptor for specifying a table source and/or sink in a batch environment.
+ */
+class BatchTableDescriptor(
+ private val tableEnv: BatchTableEnvironment,
+ private val connectorDescriptor: ConnectorDescriptor)
+ extends TableDescriptor
+ with SchematicDescriptor
+ with RegistrableDescriptor {
+
+ private var formatDescriptor: Option[FormatDescriptor] = None
+ private var schemaDescriptor: Option[Schema] = None
+
+ /**
+ * Internal method for properties conversion.
+ */
+ override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
+ connectorDescriptor.addProperties(properties)
+ formatDescriptor.foreach(_.addProperties(properties))
+ schemaDescriptor.foreach(_.addProperties(properties))
+ }
+
+ /**
+ * Searches for the specified table source, configures it accordingly, and registers it as
+ * a table under the given name.
+ *
+ * @param name table name to be registered in the table environment
+ */
+ override def registerTableSource(name: String): Unit = {
+ val javaMap = getValidProperties.asMap
+ val tableSource = TableFactoryService
+ .find(classOf[BatchTableSourceFactory[_]], javaMap)
+ .createBatchTableSource(javaMap)
+ tableEnv.registerTableSource(name, tableSource)
+ }
+
+ /**
+ * Searches for the specified table sink, configures it accordingly, and registers it as
+ * a table under the given name.
+ *
+ * @param name table name to be registered in the table environment
+ */
+ override def registerTableSink(name: String): Unit = {
+ val javaMap = getValidProperties.asMap
+ val tableSink = TableFactoryService
+ .find(classOf[BatchTableSinkFactory[_]], javaMap)
+ .createBatchTableSink(javaMap)
+ tableEnv.registerTableSink(name, tableSink)
+ }
+
+ /**
+ * Searches for the specified table source and sink, configures them accordingly, and registers
+ * them as a table under the given name.
+ *
+ * @param name table name to be registered in the table environment
+ */
+ override def registerTableSourceAndSink(name: String): Unit = {
+ registerTableSource(name)
+ registerTableSink(name)
+ }
+
+ /**
+ * Specifies the format that defines how to read data from a connector.
+ */
+ override def withFormat(format: FormatDescriptor): BatchTableDescriptor = {
+ formatDescriptor = Some(format)
+ this
+ }
+
+ /**
+ * Specifies the resulting table schema.
+ */
+ override def withSchema(schema: Schema): BatchTableDescriptor = {
+ schemaDescriptor = Some(schema)
+ this
+ }
+
+ override def toString: String = {
+ getValidProperties.toString
+ }
+
+ // ----------------------------------------------------------------------------------------------
+
+ private def getValidProperties: DescriptorProperties = {
--- End diff --
I agree but I cannot move it up the class hierarchy because it would be public in Java. I will create a util class.
---
[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6343#discussion_r202981771
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import java.util
+
+import org.apache.flink.table.api._
+import org.apache.flink.table.descriptors.DescriptorProperties
+import org.apache.flink.table.factories._
+import org.apache.flink.table.plan.schema._
+import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.util.Logging
+
+
+/**
+ * The utility class is used to convert [[ExternalCatalogTable]] to [[TableSourceSinkTable]].
+ *
+ * It uses [[TableFactoryService]] for discovering.
+ */
+object ExternalTableUtil extends Logging {
+
+ /**
+ * Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance
+ *
+ * @param externalCatalogTable the [[ExternalCatalogTable]] instance which to convert
+ * @return converted [[TableSourceTable]] instance from the input catalog table
+ */
+ def fromExternalCatalogTable[T1, T2](
+ tableEnv: TableEnvironment,
+ externalCatalogTable: ExternalCatalogTable)
+ : TableSourceSinkTable[T1, T2] = {
+
+ val properties = new DescriptorProperties()
+ externalCatalogTable.addProperties(properties)
+ val javaMap = properties.asMap
+ val statistics = new FlinkStatistic(externalCatalogTable.getTableStats)
+
+ val source: Option[TableSourceTable[T1]] = tableEnv match {
+ // check for a batch table source in this batch environment
+ case _: BatchTableEnvironment if externalCatalogTable.isBatchTable =>
+ createBatchTableSource(externalCatalogTable, javaMap, statistics)
+
+ // check for a stream table source in this stream environment
+ case _: StreamTableEnvironment if externalCatalogTable.isStreamTable =>
+ createStreamTableSource(externalCatalogTable, javaMap, statistics)
+
+ case _ =>
+ throw new ValidationException(
+ "External catalog table does not support the current environment for a table source.")
+ }
+
+ val sink: Option[TableSinkTable[T2]] = tableEnv match {
+ // check for a batch table sink in this batch environment
+ case _: BatchTableEnvironment if externalCatalogTable.isBatchTable =>
+ createBatchTableSink(externalCatalogTable, javaMap, statistics)
+
+ // check for a stream table sink in this stream environment
+ case _: StreamTableEnvironment if externalCatalogTable.isStreamTable =>
+ createStreamTableSink(externalCatalogTable, javaMap, statistics)
+
+ case _ =>
+ throw new ValidationException(
+ "External catalog table does not support the current environment for a table sink.")
+ }
+
+ new TableSourceSinkTable[T1, T2](source, sink)
+ }
+
+ private def createBatchTableSource[T](
--- End diff --
Then we would have 4 factories that have to be checked with if/else branches. Having those if/else at 3 places (SQL Client, external catalog, and descriptors) is acceptable in my opinion.
---