You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by twalthr <gi...@git.apache.org> on 2018/07/16 16:03:00 UTC

[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

GitHub user twalthr opened a pull request:

    https://github.com/apache/flink/pull/6343

    [FLINK-9852] [table] Expose descriptor-based sink creation

    ## What is the purpose of the change
    
    This commit exposes the new unified sink creation through the table environments and the external catalog table. It introduce a new `update-mode` property in order to distinguish between append, retract, and upsert table sources and sinks. This commit refactors the top-level API classes a last time and adds more documentation. This commit completes the unified table sources/sinks story from an API point of view.
    
    ## Brief change log
    
    - Introduction of `TableEnvironment.connect()` and corresponding API builder classes
    - Introduction of property `update-mode: table` and update of existing connectors
    - External catalog support with proper source/sink discovery and API
    
    ## Verifying this change
    
    Existing tests were adapted.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): no
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
      - The serializers: no
      - The runtime per-record code paths (performance sensitive): no
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
      - The S3 file system connector: no
    
    ## Documentation
    
      - Does this pull request introduce a new feature? no
      - If yes, how is the feature documented? not documented


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/twalthr/flink FLINK-9852

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6343.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6343
    
----

----


---

[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6343#discussion_r202961529
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala ---
    @@ -18,33 +18,299 @@
     
     package org.apache.flink.table.catalog
     
    +import org.apache.flink.table.descriptors.DescriptorProperties.toScala
    +import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats}
    +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT}
     import org.apache.flink.table.descriptors._
     import org.apache.flink.table.plan.stats.TableStats
     
    +import scala.collection.JavaConverters._
    +
     /**
    -  * Defines a table in an [[ExternalCatalog]].
    +  * Defines a table in an [[ExternalCatalog]]. External catalog tables describe table sources
    +  * and/or sinks for both batch and stream environments.
    +  *
    +  * The catalog table takes descriptors which allow for declaring the communication to external
    +  * systems in an implementation-agnostic way. The classpath is scanned for suitable table factories
    +  * that match the desired configuration.
    +  *
    +  * Use the provided builder methods to configure the external catalog table accordingly.
    +  *
    +  * The following example shows how to read from a connector using a JSON format and
    +  * declaring it as a table source:
       *
    -  * @param connectorDesc describes the system to connect to
    -  * @param formatDesc describes the data format of a connector
    -  * @param schemaDesc describes the schema of the result table
    -  * @param statisticsDesc describes the estimated statistics of the result table
    -  * @param metadataDesc describes additional metadata of a table
    +  * {{{
    +  *   ExternalCatalogTable(
    +  *     new ExternalSystemXYZ()
    +  *       .version("0.11"))
    +  *   .withFormat(
    +  *     new Json()
    +  *       .jsonSchema("{...}")
    +  *       .failOnMissingField(false))
    +  *   .withSchema(
    +  *     new Schema()
    +  *       .field("user-name", "VARCHAR").from("u_name")
    +  *       .field("count", "DECIMAL")
    +  *   .asTableSource()
    --- End diff --
    
    make `ExternalCatalogTable` and `BatchTableDescriptor` true builders with final fields?


---

[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6343#discussion_r202964290
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala ---
    @@ -0,0 +1,148 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.util
    +
    +import org.apache.flink.table.api._
    +import org.apache.flink.table.descriptors.DescriptorProperties
    +import org.apache.flink.table.factories._
    +import org.apache.flink.table.plan.schema._
    +import org.apache.flink.table.plan.stats.FlinkStatistic
    +import org.apache.flink.table.util.Logging
    +
    +
    +/**
    +  * The utility class is used to convert [[ExternalCatalogTable]] to [[TableSourceSinkTable]].
    +  *
    +  * It uses [[TableFactoryService]] for discovering.
    +  */
    +object ExternalTableUtil extends Logging {
    +
    +  /**
    +    * Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance
    +    *
    +    * @param externalCatalogTable the [[ExternalCatalogTable]] instance which to convert
    +    * @return converted [[TableSourceTable]] instance from the input catalog table
    +    */
    +  def fromExternalCatalogTable[T1, T2](
    +      tableEnv: TableEnvironment,
    +      externalCatalogTable: ExternalCatalogTable)
    +    : TableSourceSinkTable[T1, T2] = {
    +
    +    val properties = new DescriptorProperties()
    +    externalCatalogTable.addProperties(properties)
    +    val javaMap = properties.asMap
    +    val statistics = new FlinkStatistic(externalCatalogTable.getTableStats)
    +
    +    val source: Option[TableSourceTable[T1]] = tableEnv match {
    +      // check for a batch table source in this batch environment
    +      case _: BatchTableEnvironment if externalCatalogTable.isBatchTable =>
    +        createBatchTableSource(externalCatalogTable, javaMap, statistics)
    +
    +      // check for a stream table source in this stream environment
    +      case _: StreamTableEnvironment if externalCatalogTable.isStreamTable =>
    +        createStreamTableSource(externalCatalogTable, javaMap, statistics)
    +
    +      case _ =>
    +        throw new ValidationException(
    +          "External catalog table does not support the current environment for a table source.")
    +    }
    +
    +    val sink: Option[TableSinkTable[T2]] = tableEnv match {
    +      // check for a batch table sink in this batch environment
    +      case _: BatchTableEnvironment if externalCatalogTable.isBatchTable =>
    +        createBatchTableSink(externalCatalogTable, javaMap, statistics)
    +
    +      // check for a stream table sink in this stream environment
    +      case _: StreamTableEnvironment if externalCatalogTable.isStreamTable =>
    +        createStreamTableSink(externalCatalogTable, javaMap, statistics)
    +
    +      case _ =>
    +        throw new ValidationException(
    +          "External catalog table does not support the current environment for a table sink.")
    +    }
    +
    +    new TableSourceSinkTable[T1, T2](source, sink)
    +  }
    +
    +  private def createBatchTableSource[T](
    --- End diff --
    
    I still do not like this lack of abstraction between batch and streaming in form of `createBatchTableSource`/`createStreamTableSource`.
    
    Instead of writing if/elses everywhere in our code there should be some common layer that handles such logic. Here half of the problem boils down to factories with methods createBatchTableSource and createStreamTableSource. 


---

[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6343#discussion_r202982499
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchematicDescriptor.scala ---
    @@ -19,14 +19,17 @@
     package org.apache.flink.table.descriptors
     
     /**
    -  * Common class for all descriptors describing a table sink.
    +  * A trait for descriptors that allow to define a format and schema.
       */
    -abstract class TableSinkDescriptor extends TableDescriptor {
    +trait SchematicDescriptor extends Descriptor {
    --- End diff --
    
    `SchematicDescriptor` is used for `ExternalCatalogTable`, `StreamTableDescriptor`, and `BatchTableDescriptor`. If we add a new level next to `connector`, `format` (which may happens in the future), we would immediately get a compile error there.


---

[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6343#discussion_r202978881
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala ---
    @@ -18,33 +18,299 @@
     
     package org.apache.flink.table.catalog
     
    +import org.apache.flink.table.descriptors.DescriptorProperties.toScala
    +import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats}
    +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT}
     import org.apache.flink.table.descriptors._
     import org.apache.flink.table.plan.stats.TableStats
     
    +import scala.collection.JavaConverters._
    +
     /**
    -  * Defines a table in an [[ExternalCatalog]].
    +  * Defines a table in an [[ExternalCatalog]]. External catalog tables describe table sources
    +  * and/or sinks for both batch and stream environments.
    +  *
    +  * The catalog table takes descriptors which allow for declaring the communication to external
    +  * systems in an implementation-agnostic way. The classpath is scanned for suitable table factories
    +  * that match the desired configuration.
    +  *
    +  * Use the provided builder methods to configure the external catalog table accordingly.
    +  *
    +  * The following example shows how to read from a connector using a JSON format and
    +  * declaring it as a table source:
       *
    -  * @param connectorDesc describes the system to connect to
    -  * @param formatDesc describes the data format of a connector
    -  * @param schemaDesc describes the schema of the result table
    -  * @param statisticsDesc describes the estimated statistics of the result table
    -  * @param metadataDesc describes additional metadata of a table
    +  * {{{
    +  *   ExternalCatalogTable(
    +  *     new ExternalSystemXYZ()
    +  *       .version("0.11"))
    +  *   .withFormat(
    +  *     new Json()
    +  *       .jsonSchema("{...}")
    +  *       .failOnMissingField(false))
    +  *   .withSchema(
    +  *     new Schema()
    +  *       .field("user-name", "VARCHAR").from("u_name")
    +  *       .field("count", "DECIMAL")
    +  *   .asTableSource()
    --- End diff --
    
    I also thought about that but actually descriptors don't "build" something. The only final representation would be the properties but we don't expose them to the user.


---

[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6343#discussion_r202986793
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableDescriptor.scala ---
    @@ -0,0 +1,126 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import org.apache.flink.table.api.{BatchTableEnvironment, ValidationException}
    +import org.apache.flink.table.factories.{BatchTableSinkFactory, BatchTableSourceFactory, TableFactoryService}
    +
    +/**
    +  * Descriptor for specifying a table source and/or sink in a batch environment.
    +  */
    +class BatchTableDescriptor(
    +    private val tableEnv: BatchTableEnvironment,
    +    private val connectorDescriptor: ConnectorDescriptor)
    +  extends TableDescriptor
    +  with SchematicDescriptor
    +  with RegistrableDescriptor {
    +
    +  private var formatDescriptor: Option[FormatDescriptor] = None
    +  private var schemaDescriptor: Option[Schema] = None
    +
    +  /**
    +    * Internal method for properties conversion.
    +    */
    +  override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
    --- End diff --
    
    Depends on the language you are using. This method is public in Java ;)
    But I will move it down.


---

[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6343#discussion_r202966786
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvAppendTableSinkFactory.scala ---
    @@ -0,0 +1,52 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.sinks
    +
    +import java.util
    +
    +import org.apache.flink.table.descriptors.SchemaValidator.{SCHEMA, SCHEMA_PROCTIME}
    +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND}
    +import org.apache.flink.table.factories.StreamTableSinkFactory
    +import org.apache.flink.types.Row
    +
    +/**
    +  * Factory base for creating configured instances of [[CsvTableSink]] in a stream environment.
    +  */
    +class CsvAppendTableSinkFactory
    --- End diff --
    
    This should be done in separate commit


---

[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6343#discussion_r202963074
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala ---
    @@ -0,0 +1,148 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.util
    +
    +import org.apache.flink.table.api._
    +import org.apache.flink.table.descriptors.DescriptorProperties
    +import org.apache.flink.table.factories._
    +import org.apache.flink.table.plan.schema._
    +import org.apache.flink.table.plan.stats.FlinkStatistic
    +import org.apache.flink.table.util.Logging
    +
    +
    +/**
    +  * The utility class is used to convert [[ExternalCatalogTable]] to [[TableSourceSinkTable]].
    +  *
    +  * It uses [[TableFactoryService]] for discovering.
    +  */
    +object ExternalTableUtil extends Logging {
    +
    +  /**
    +    * Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance
    +    *
    +    * @param externalCatalogTable the [[ExternalCatalogTable]] instance which to convert
    +    * @return converted [[TableSourceTable]] instance from the input catalog table
    +    */
    +  def fromExternalCatalogTable[T1, T2](
    +      tableEnv: TableEnvironment,
    +      externalCatalogTable: ExternalCatalogTable)
    +    : TableSourceSinkTable[T1, T2] = {
    +
    +    val properties = new DescriptorProperties()
    +    externalCatalogTable.addProperties(properties)
    +    val javaMap = properties.asMap
    +    val statistics = new FlinkStatistic(externalCatalogTable.getTableStats)
    +
    +    val source: Option[TableSourceTable[T1]] = tableEnv match {
    +      // check for a batch table source in this batch environment
    --- End diff --
    
    drop those comments, code is already self explanatory 


---

[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6343#discussion_r203303655
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala ---
    @@ -0,0 +1,148 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.util
    +
    +import org.apache.flink.table.api._
    +import org.apache.flink.table.descriptors.DescriptorProperties
    +import org.apache.flink.table.factories._
    +import org.apache.flink.table.plan.schema._
    +import org.apache.flink.table.plan.stats.FlinkStatistic
    +import org.apache.flink.table.util.Logging
    +
    +
    +/**
    +  * The utility class is used to convert [[ExternalCatalogTable]] to [[TableSourceSinkTable]].
    +  *
    +  * It uses [[TableFactoryService]] for discovering.
    +  */
    +object ExternalTableUtil extends Logging {
    +
    +  /**
    +    * Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance
    +    *
    +    * @param externalCatalogTable the [[ExternalCatalogTable]] instance which to convert
    +    * @return converted [[TableSourceTable]] instance from the input catalog table
    +    */
    +  def fromExternalCatalogTable[T1, T2](
    +      tableEnv: TableEnvironment,
    +      externalCatalogTable: ExternalCatalogTable)
    +    : TableSourceSinkTable[T1, T2] = {
    +
    +    val properties = new DescriptorProperties()
    +    externalCatalogTable.addProperties(properties)
    +    val javaMap = properties.asMap
    +    val statistics = new FlinkStatistic(externalCatalogTable.getTableStats)
    +
    +    val source: Option[TableSourceTable[T1]] = tableEnv match {
    +      // check for a batch table source in this batch environment
    +      case _: BatchTableEnvironment if externalCatalogTable.isBatchTable =>
    +        createBatchTableSource(externalCatalogTable, javaMap, statistics)
    +
    +      // check for a stream table source in this stream environment
    +      case _: StreamTableEnvironment if externalCatalogTable.isStreamTable =>
    +        createStreamTableSource(externalCatalogTable, javaMap, statistics)
    +
    +      case _ =>
    +        throw new ValidationException(
    +          "External catalog table does not support the current environment for a table source.")
    +    }
    +
    +    val sink: Option[TableSinkTable[T2]] = tableEnv match {
    +      // check for a batch table sink in this batch environment
    +      case _: BatchTableEnvironment if externalCatalogTable.isBatchTable =>
    +        createBatchTableSink(externalCatalogTable, javaMap, statistics)
    +
    +      // check for a stream table sink in this stream environment
    +      case _: StreamTableEnvironment if externalCatalogTable.isStreamTable =>
    +        createStreamTableSink(externalCatalogTable, javaMap, statistics)
    +
    +      case _ =>
    +        throw new ValidationException(
    +          "External catalog table does not support the current environment for a table sink.")
    +    }
    +
    +    new TableSourceSinkTable[T1, T2](source, sink)
    +  }
    +
    +  private def createBatchTableSource[T](
    +      externalCatalogTable: ExternalCatalogTable,
    +      javaMap: util.Map[String, String],
    +      statistics: FlinkStatistic)
    +    : Option[TableSourceTable[T]] = if (externalCatalogTable.isTableSource) {
    --- End diff --
    
    Is that good enough reason to force us both to write a code that we do not like? Apparently not only we don't like it: https://stackoverflow.com/a/33425307/8149051
    
    After reading your link I get why you shouldn't use return in lambda functions, but he doesn't give a point against using them in methods. At least I do not see it.
    
    However if you are not sure, at least reverse if/else branches. Simpler branch should always go first. 


---

[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6343#discussion_r202980228
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala ---
    @@ -0,0 +1,148 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.util
    +
    +import org.apache.flink.table.api._
    +import org.apache.flink.table.descriptors.DescriptorProperties
    +import org.apache.flink.table.factories._
    +import org.apache.flink.table.plan.schema._
    +import org.apache.flink.table.plan.stats.FlinkStatistic
    +import org.apache.flink.table.util.Logging
    +
    +
    +/**
    +  * The utility class is used to convert [[ExternalCatalogTable]] to [[TableSourceSinkTable]].
    +  *
    +  * It uses [[TableFactoryService]] for discovering.
    +  */
    +object ExternalTableUtil extends Logging {
    +
    +  /**
    +    * Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance
    +    *
    +    * @param externalCatalogTable the [[ExternalCatalogTable]] instance which to convert
    +    * @return converted [[TableSourceTable]] instance from the input catalog table
    +    */
    +  def fromExternalCatalogTable[T1, T2](
    +      tableEnv: TableEnvironment,
    +      externalCatalogTable: ExternalCatalogTable)
    +    : TableSourceSinkTable[T1, T2] = {
    +
    +    val properties = new DescriptorProperties()
    +    externalCatalogTable.addProperties(properties)
    +    val javaMap = properties.asMap
    +    val statistics = new FlinkStatistic(externalCatalogTable.getTableStats)
    +
    +    val source: Option[TableSourceTable[T1]] = tableEnv match {
    +      // check for a batch table source in this batch environment
    +      case _: BatchTableEnvironment if externalCatalogTable.isBatchTable =>
    +        createBatchTableSource(externalCatalogTable, javaMap, statistics)
    +
    +      // check for a stream table source in this stream environment
    +      case _: StreamTableEnvironment if externalCatalogTable.isStreamTable =>
    +        createStreamTableSource(externalCatalogTable, javaMap, statistics)
    +
    +      case _ =>
    +        throw new ValidationException(
    +          "External catalog table does not support the current environment for a table source.")
    +    }
    +
    +    val sink: Option[TableSinkTable[T2]] = tableEnv match {
    +      // check for a batch table sink in this batch environment
    +      case _: BatchTableEnvironment if externalCatalogTable.isBatchTable =>
    +        createBatchTableSink(externalCatalogTable, javaMap, statistics)
    +
    +      // check for a stream table sink in this stream environment
    +      case _: StreamTableEnvironment if externalCatalogTable.isStreamTable =>
    +        createStreamTableSink(externalCatalogTable, javaMap, statistics)
    +
    +      case _ =>
    +        throw new ValidationException(
    +          "External catalog table does not support the current environment for a table sink.")
    +    }
    +
    +    new TableSourceSinkTable[T1, T2](source, sink)
    +  }
    +
    +  private def createBatchTableSource[T](
    +      externalCatalogTable: ExternalCatalogTable,
    +      javaMap: util.Map[String, String],
    +      statistics: FlinkStatistic)
    +    : Option[TableSourceTable[T]] = if (externalCatalogTable.isTableSource) {
    --- End diff --
    
    I also like this pattern more but it is not very Scala-like:
    https://tpolecat.github.io/2014/05/09/return.html


---

[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6343#discussion_r202989785
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala ---
    @@ -0,0 +1,182 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import org.apache.flink.table.api.{StreamTableEnvironment, ValidationException}
    +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator._
    +import org.apache.flink.table.factories.{StreamTableSinkFactory, StreamTableSourceFactory, TableFactoryService}
    +
    +/**
    +  * Descriptor for specifying a table source and/or sink in a streaming environment.
    +  */
    +class StreamTableDescriptor(
    --- End diff --
    
    I don't see an alternative if we don't want to have an ugly API.


---

[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6343#discussion_r202965060
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableDescriptor.scala ---
    @@ -0,0 +1,126 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import org.apache.flink.table.api.{BatchTableEnvironment, ValidationException}
    +import org.apache.flink.table.factories.{BatchTableSinkFactory, BatchTableSourceFactory, TableFactoryService}
    +
    +/**
    +  * Descriptor for specifying a table source and/or sink in a batch environment.
    +  */
    +class BatchTableDescriptor(
    +    private val tableEnv: BatchTableEnvironment,
    +    private val connectorDescriptor: ConnectorDescriptor)
    +  extends TableDescriptor
    +  with SchematicDescriptor
    +  with RegistrableDescriptor {
    +
    +  private var formatDescriptor: Option[FormatDescriptor] = None
    +  private var schemaDescriptor: Option[Schema] = None
    +
    +  /**
    +    * Internal method for properties conversion.
    +    */
    +  override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
    +    connectorDescriptor.addProperties(properties)
    +    formatDescriptor.foreach(_.addProperties(properties))
    +    schemaDescriptor.foreach(_.addProperties(properties))
    +  }
    +
    +  /**
    +    * Searches for the specified table source, configures it accordingly, and registers it as
    +    * a table under the given name.
    +    *
    +    * @param name table name to be registered in the table environment
    +    */
    +  override def registerTableSource(name: String): Unit = {
    +    val javaMap = getValidProperties.asMap
    +    val tableSource = TableFactoryService
    +      .find(classOf[BatchTableSourceFactory[_]], javaMap)
    +      .createBatchTableSource(javaMap)
    +    tableEnv.registerTableSource(name, tableSource)
    +  }
    +
    +  /**
    +    * Searches for the specified table sink, configures it accordingly, and registers it as
    +    * a table under the given name.
    +    *
    +    * @param name table name to be registered in the table environment
    +    */
    +  override def registerTableSink(name: String): Unit = {
    +    val javaMap = getValidProperties.asMap
    +    val tableSink = TableFactoryService
    +      .find(classOf[BatchTableSinkFactory[_]], javaMap)
    +      .createBatchTableSink(javaMap)
    +    tableEnv.registerTableSink(name, tableSink)
    +  }
    +
    +  /**
    +    * Searches for the specified table source and sink, configures them accordingly, and registers
    +    * them as a table under the given name.
    +    *
    +    * @param name table name to be registered in the table environment
    +    */
    +  override def registerTableSourceAndSink(name: String): Unit = {
    +    registerTableSource(name)
    +    registerTableSink(name)
    +  }
    +
    +  /**
    +    * Specifies the format that defines how to read data from a connector.
    +    */
    +  override def withFormat(format: FormatDescriptor): BatchTableDescriptor = {
    +    formatDescriptor = Some(format)
    +    this
    +  }
    +
    +  /**
    +    * Specifies the resulting table schema.
    +    */
    +  override def withSchema(schema: Schema): BatchTableDescriptor = {
    +    schemaDescriptor = Some(schema)
    +    this
    +  }
    +
    +  override def toString: String = {
    +    getValidProperties.toString
    +  }
    +
    +  // ----------------------------------------------------------------------------------------------
    +
    +  private def getValidProperties: DescriptorProperties = {
    --- End diff --
    
    duplicated code with `StreamTableDescriptor. getValidProperties`


---

[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6343#discussion_r202964952
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala ---
    @@ -0,0 +1,182 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import org.apache.flink.table.api.{StreamTableEnvironment, ValidationException}
    +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator._
    +import org.apache.flink.table.factories.{StreamTableSinkFactory, StreamTableSourceFactory, TableFactoryService}
    +
    +/**
    +  * Descriptor for specifying a table source and/or sink in a streaming environment.
    +  */
    +class StreamTableDescriptor(
    +    private val tableEnv: StreamTableEnvironment,
    +    private val connectorDescriptor: ConnectorDescriptor)
    +  extends TableDescriptor
    +  with SchematicDescriptor
    +  with RegistrableDescriptor
    +  with StreamableDescriptor {
    +
    +  private var formatDescriptor: Option[FormatDescriptor] = None
    +  private var schemaDescriptor: Option[Schema] = None
    +  private var updateMode: Option[String] = None
    +
    +  /**
    +    * Internal method for properties conversion.
    +    */
    +  override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
    +    connectorDescriptor.addProperties(properties)
    +    formatDescriptor.foreach(_.addProperties(properties))
    +    schemaDescriptor.foreach(_.addProperties(properties))
    +    updateMode.foreach(mode => properties.putString(UPDATE_MODE, mode))
    +  }
    +
    +  /**
    +    * Searches for the specified table source, configures it accordingly, and registers it as
    +    * a table under the given name.
    +    *
    +    * @param name table name to be registered in the table environment
    +    */
    +  override def registerTableSource(name: String): Unit = {
    +    val javaMap = getValidProperties.asMap
    +    val tableSource = TableFactoryService
    +      .find(classOf[StreamTableSourceFactory[_]], javaMap)
    +      .createStreamTableSource(javaMap)
    +    tableEnv.registerTableSource(name, tableSource)
    +  }
    +
    +  /**
    +    * Searches for the specified table sink, configures it accordingly, and registers it as
    +    * a table under the given name.
    +    *
    +    * @param name table name to be registered in the table environment
    +    */
    +  override def registerTableSink(name: String): Unit = {
    +    val javaMap = getValidProperties.asMap
    +    val tableSink = TableFactoryService
    +      .find(classOf[StreamTableSinkFactory[_]], javaMap)
    +      .createStreamTableSink(javaMap)
    +    tableEnv.registerTableSink(name, tableSink)
    +  }
    +
    +  /**
    +    * Searches for the specified table source and sink, configures them accordingly, and registers
    +    * them as a table under the given name.
    +    *
    +    * @param name table name to be registered in the table environment
    +    */
    +  override def registerTableSourceAndSink(name: String): Unit = {
    +    registerTableSource(name)
    +    registerTableSink(name)
    +  }
    +
    +  /**
    +    * Specifies the format that defines how to read data from a connector.
    +    */
    +  override def withFormat(format: FormatDescriptor): StreamTableDescriptor = {
    +    formatDescriptor = Some(format)
    +    this
    +  }
    +
    +  /**
    +    * Specifies the resulting table schema.
    +    */
    +  override def withSchema(schema: Schema): StreamTableDescriptor = {
    +    schemaDescriptor = Some(schema)
    +    this
    +  }
    +
    +  /**
    +    * Declares how to perform the conversion between a dynamic table and an external connector.
    +    *
    +    * In append mode, a dynamic table and an external connector only exchange INSERT messages.
    +    *
    +    * @see See also [[inRetractMode()]] and [[inUpsertMode()]].
    +    */
    +  override def inAppendMode(): StreamTableDescriptor = {
    +    updateMode = Some(UPDATE_MODE_VALUE_APPEND)
    +    this
    +  }
    +
    +  /**
    +    * Declares how to perform the conversion between a dynamic table and an external connector.
    +    *
    +    * In retract mode, a dynamic table and an external connector exchange ADD and RETRACT messages.
    +    *
    +    * An INSERT change is encoded as an ADD message, a DELETE change as a RETRACT message, and an
    +    * UPDATE change as a RETRACT message for the updated (previous) row and an ADD message for
    +    * the updating (new) row.
    +    *
    +    * In this mode, a key must not be defined as opposed to upsert mode. However, every update
    +    * consists of two messages which is less efficient.
    +    *
    +    * @see See also [[inAppendMode()]] and [[inUpsertMode()]].
    +    */
    +  override def inRetractMode(): StreamTableDescriptor = {
    +    updateMode = Some(UPDATE_MODE_VALUE_RETRACT)
    +    this
    +  }
    +
    +  /**
    +    * Declares how to perform the conversion between a dynamic table and an external connector.
    +    *
    +    * In upsert mode, a dynamic table and an external connector exchange UPSERT and DELETE messages.
    +    *
    +    * This mode requires a (possibly composite) unique key by which updates can be propagated. The
    +    * external connector needs to be aware of the unique key attribute in order to apply messages
    +    * correctly. INSERT and UPDATE changes are encoded as UPSERT messages. DELETE changes as
    +    * DELETE messages.
    +    *
    +    * The main difference to a retract stream is that UPDATE changes are encoded with a single
    +    * message and are therefore more efficient.
    +    *
    +    * @see See also [[inAppendMode()]] and [[inRetractMode()]].
    +    */
    +  override def inUpsertMode(): StreamTableDescriptor = {
    +    updateMode = Some(UPDATE_MODE_VALUE_UPSERT)
    +    this
    +  }
    +
    +  override def toString: String = {
    +    getValidProperties.toString
    +  }
    +
    +  // ----------------------------------------------------------------------------------------------
    +
    +  private def getValidProperties: DescriptorProperties = {
    +    val properties = new DescriptorProperties()
    +    addProperties(properties)
    +
    +    // check for a format
    +    if (connectorDescriptor.needsFormat() && formatDescriptor.isEmpty) {
    +      throw new ValidationException(
    +        s"The connector '$connectorDescriptor' requires a format description.")
    +    } else if (!connectorDescriptor.needsFormat() && formatDescriptor.isDefined) {
    +      throw new ValidationException(
    +        s"The connector '$connectorDescriptor' does not require a format description " +
    +          s"but '${formatDescriptor.get}' found.")
    +    }
    +
    +    // basic validation
    --- End diff --
    
    drop the comment


---

[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6343#discussion_r203301509
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala ---
    @@ -18,33 +18,299 @@
     
     package org.apache.flink.table.catalog
     
    +import org.apache.flink.table.descriptors.DescriptorProperties.toScala
    +import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats}
    +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT}
     import org.apache.flink.table.descriptors._
     import org.apache.flink.table.plan.stats.TableStats
     
    +import scala.collection.JavaConverters._
    +
     /**
    -  * Defines a table in an [[ExternalCatalog]].
    +  * Defines a table in an [[ExternalCatalog]]. External catalog tables describe table sources
    +  * and/or sinks for both batch and stream environments.
    +  *
    +  * The catalog table takes descriptors which allow for declaring the communication to external
    +  * systems in an implementation-agnostic way. The classpath is scanned for suitable table factories
    +  * that match the desired configuration.
    +  *
    +  * Use the provided builder methods to configure the external catalog table accordingly.
    +  *
    +  * The following example shows how to read from a connector using a JSON format and
    +  * declaring it as a table source:
       *
    -  * @param connectorDesc describes the system to connect to
    -  * @param formatDesc describes the data format of a connector
    -  * @param schemaDesc describes the schema of the result table
    -  * @param statisticsDesc describes the estimated statistics of the result table
    -  * @param metadataDesc describes additional metadata of a table
    +  * {{{
    +  *   ExternalCatalogTable(
    +  *     new ExternalSystemXYZ()
    +  *       .version("0.11"))
    +  *   .withFormat(
    +  *     new Json()
    +  *       .jsonSchema("{...}")
    +  *       .failOnMissingField(false))
    +  *   .withSchema(
    +  *     new Schema()
    +  *       .field("user-name", "VARCHAR").from("u_name")
    +  *       .field("count", "DECIMAL")
    +  *   .asTableSource()
    +  * }}}
    +  *
    +  * Note: For backwards-compatibility, the table is declared as a table source for batch and
    +  * streaming environment by default.
    +  *
    +  * See also [[org.apache.flink.table.factories.TableFactory]] for more information about how
    +  * to target suitable factories.
    +  *
    +  * @param connectorDescriptor describes the system to connect to
       */
    -class ExternalCatalogTable(
    -    connectorDesc: ConnectorDescriptor,
    -    formatDesc: Option[FormatDescriptor],
    -    schemaDesc: Option[Schema],
    -    statisticsDesc: Option[Statistics],
    -    metadataDesc: Option[Metadata])
    -  extends TableSourceDescriptor {
    -
    -  this.connectorDescriptor = Some(connectorDesc)
    -  this.formatDescriptor = formatDesc
    -  this.schemaDescriptor = schemaDesc
    -  this.statisticsDescriptor = statisticsDesc
    -  this.metaDescriptor = metadataDesc
    -
    -  // expose statistics for external table source util
    -  override def getTableStats: Option[TableStats] = super.getTableStats
    +class ExternalCatalogTable(val connectorDescriptor: ConnectorDescriptor)
    +  extends TableDescriptor
    --- End diff --
    
    is it consistent (or at least mostly consistent) with our code base? 


---

[GitHub] flink issue #6343: [FLINK-9852] [table] Expose descriptor-based sink creatio...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/6343
  
    Thank you for your suggestions @pnowojski. I hope I could address most of them in the new commits. I improved the code duplication a bit and `ExternalCatalogTable` is immutable now with a proper builder. 


---

[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6343#discussion_r203349277
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala ---
    @@ -18,33 +18,299 @@
     
     package org.apache.flink.table.catalog
     
    +import org.apache.flink.table.descriptors.DescriptorProperties.toScala
    +import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats}
    +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT}
     import org.apache.flink.table.descriptors._
     import org.apache.flink.table.plan.stats.TableStats
     
    +import scala.collection.JavaConverters._
    +
     /**
    -  * Defines a table in an [[ExternalCatalog]].
    +  * Defines a table in an [[ExternalCatalog]]. External catalog tables describe table sources
    +  * and/or sinks for both batch and stream environments.
    +  *
    +  * The catalog table takes descriptors which allow for declaring the communication to external
    +  * systems in an implementation-agnostic way. The classpath is scanned for suitable table factories
    +  * that match the desired configuration.
    +  *
    +  * Use the provided builder methods to configure the external catalog table accordingly.
    +  *
    +  * The following example shows how to read from a connector using a JSON format and
    +  * declaring it as a table source:
       *
    -  * @param connectorDesc describes the system to connect to
    -  * @param formatDesc describes the data format of a connector
    -  * @param schemaDesc describes the schema of the result table
    -  * @param statisticsDesc describes the estimated statistics of the result table
    -  * @param metadataDesc describes additional metadata of a table
    +  * {{{
    +  *   ExternalCatalogTable(
    +  *     new ExternalSystemXYZ()
    +  *       .version("0.11"))
    +  *   .withFormat(
    +  *     new Json()
    +  *       .jsonSchema("{...}")
    +  *       .failOnMissingField(false))
    +  *   .withSchema(
    +  *     new Schema()
    +  *       .field("user-name", "VARCHAR").from("u_name")
    +  *       .field("count", "DECIMAL")
    +  *   .asTableSource()
    +  * }}}
    +  *
    +  * Note: For backwards-compatibility, the table is declared as a table source for batch and
    +  * streaming environment by default.
    +  *
    +  * See also [[org.apache.flink.table.factories.TableFactory]] for more information about how
    +  * to target suitable factories.
    +  *
    +  * @param connectorDescriptor describes the system to connect to
       */
    -class ExternalCatalogTable(
    -    connectorDesc: ConnectorDescriptor,
    -    formatDesc: Option[FormatDescriptor],
    -    schemaDesc: Option[Schema],
    -    statisticsDesc: Option[Statistics],
    -    metadataDesc: Option[Metadata])
    -  extends TableSourceDescriptor {
    -
    -  this.connectorDescriptor = Some(connectorDesc)
    -  this.formatDescriptor = formatDesc
    -  this.schemaDescriptor = schemaDesc
    -  this.statisticsDescriptor = statisticsDesc
    -  this.metaDescriptor = metadataDesc
    -
    -  // expose statistics for external table source util
    -  override def getTableStats: Option[TableStats] = super.getTableStats
    +class ExternalCatalogTable(val connectorDescriptor: ConnectorDescriptor)
    +  extends TableDescriptor
    --- End diff --
    
    This is the code style that we should all comply with.


---

[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6343#discussion_r203301285
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala ---
    @@ -18,33 +18,299 @@
     
     package org.apache.flink.table.catalog
     
    +import org.apache.flink.table.descriptors.DescriptorProperties.toScala
    +import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats}
    +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT}
     import org.apache.flink.table.descriptors._
     import org.apache.flink.table.plan.stats.TableStats
     
    +import scala.collection.JavaConverters._
    +
     /**
    -  * Defines a table in an [[ExternalCatalog]].
    +  * Defines a table in an [[ExternalCatalog]]. External catalog tables describe table sources
    +  * and/or sinks for both batch and stream environments.
    +  *
    +  * The catalog table takes descriptors which allow for declaring the communication to external
    +  * systems in an implementation-agnostic way. The classpath is scanned for suitable table factories
    +  * that match the desired configuration.
    +  *
    +  * Use the provided builder methods to configure the external catalog table accordingly.
    +  *
    +  * The following example shows how to read from a connector using a JSON format and
    +  * declaring it as a table source:
       *
    -  * @param connectorDesc describes the system to connect to
    -  * @param formatDesc describes the data format of a connector
    -  * @param schemaDesc describes the schema of the result table
    -  * @param statisticsDesc describes the estimated statistics of the result table
    -  * @param metadataDesc describes additional metadata of a table
    +  * {{{
    +  *   ExternalCatalogTable(
    +  *     new ExternalSystemXYZ()
    +  *       .version("0.11"))
    +  *   .withFormat(
    +  *     new Json()
    +  *       .jsonSchema("{...}")
    +  *       .failOnMissingField(false))
    +  *   .withSchema(
    +  *     new Schema()
    +  *       .field("user-name", "VARCHAR").from("u_name")
    +  *       .field("count", "DECIMAL")
    +  *   .asTableSource()
    --- End diff --
    
    building could be a time/place to validate the builder state, plus provide a Immutable built class, with final fields (which is huge benefit on it's own that simplifies reasoning about the code). 
    
    https://stackoverflow.com/a/5652870/8149051
    https://www.linkedin.com/pulse/20140528113353-16837833-6-benefits-of-programming-with-immutable-objects-in-java/
    > “Classes should be immutable unless there’s a very good reason to make them mutable. If a class cannot be made immutable, limit its mutability as much as possible.”
    
    Another issue/questions. Does it make sense to create `ExternalCatalogTable` without defined format/schema/statistics/metadata/updateMode? If not - fail early in `build` method here. `build` method could be `asTableSource/Sink`.


---

[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6343#discussion_r202965391
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala ---
    @@ -0,0 +1,182 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import org.apache.flink.table.api.{StreamTableEnvironment, ValidationException}
    +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator._
    +import org.apache.flink.table.factories.{StreamTableSinkFactory, StreamTableSourceFactory, TableFactoryService}
    +
    +/**
    +  * Descriptor for specifying a table source and/or sink in a streaming environment.
    +  */
    +class StreamTableDescriptor(
    --- End diff --
    
    This class duplicates code with `BatchTableDescriptor`


---

[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/6343


---

[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6343#discussion_r202963234
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala ---
    @@ -0,0 +1,148 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.util
    +
    +import org.apache.flink.table.api._
    +import org.apache.flink.table.descriptors.DescriptorProperties
    +import org.apache.flink.table.factories._
    +import org.apache.flink.table.plan.schema._
    +import org.apache.flink.table.plan.stats.FlinkStatistic
    +import org.apache.flink.table.util.Logging
    +
    +
    +/**
    +  * The utility class is used to convert [[ExternalCatalogTable]] to [[TableSourceSinkTable]].
    +  *
    +  * It uses [[TableFactoryService]] for discovering.
    +  */
    +object ExternalTableUtil extends Logging {
    +
    +  /**
    +    * Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance
    +    *
    +    * @param externalCatalogTable the [[ExternalCatalogTable]] instance which to convert
    +    * @return converted [[TableSourceTable]] instance from the input catalog table
    +    */
    +  def fromExternalCatalogTable[T1, T2](
    +      tableEnv: TableEnvironment,
    +      externalCatalogTable: ExternalCatalogTable)
    +    : TableSourceSinkTable[T1, T2] = {
    +
    +    val properties = new DescriptorProperties()
    +    externalCatalogTable.addProperties(properties)
    +    val javaMap = properties.asMap
    +    val statistics = new FlinkStatistic(externalCatalogTable.getTableStats)
    +
    +    val source: Option[TableSourceTable[T1]] = tableEnv match {
    +      // check for a batch table source in this batch environment
    +      case _: BatchTableEnvironment if externalCatalogTable.isBatchTable =>
    +        createBatchTableSource(externalCatalogTable, javaMap, statistics)
    +
    +      // check for a stream table source in this stream environment
    +      case _: StreamTableEnvironment if externalCatalogTable.isStreamTable =>
    +        createStreamTableSource(externalCatalogTable, javaMap, statistics)
    +
    +      case _ =>
    +        throw new ValidationException(
    +          "External catalog table does not support the current environment for a table source.")
    +    }
    +
    +    val sink: Option[TableSinkTable[T2]] = tableEnv match {
    +      // check for a batch table sink in this batch environment
    +      case _: BatchTableEnvironment if externalCatalogTable.isBatchTable =>
    +        createBatchTableSink(externalCatalogTable, javaMap, statistics)
    +
    +      // check for a stream table sink in this stream environment
    +      case _: StreamTableEnvironment if externalCatalogTable.isStreamTable =>
    +        createStreamTableSink(externalCatalogTable, javaMap, statistics)
    +
    +      case _ =>
    +        throw new ValidationException(
    +          "External catalog table does not support the current environment for a table sink.")
    +    }
    +
    +    new TableSourceSinkTable[T1, T2](source, sink)
    +  }
    +
    +  private def createBatchTableSource[T](
    +      externalCatalogTable: ExternalCatalogTable,
    +      javaMap: util.Map[String, String],
    +      statistics: FlinkStatistic)
    +    : Option[TableSourceTable[T]] = if (externalCatalogTable.isTableSource) {
    --- End diff --
    
    reverse if/else branches - simpler case should be first
    
    also if you change it to
    ```
    if (!externalCatalogTable.isTableSource) {
      return None
    }
    val source = TableFactoryService
      .find(classOf[BatchTableSourceFactory[T]], javaMap)
      .createBatchTableSource(javaMap)
    val table = new BatchTableSourceTable(
      source,
      statistics)
    Some(table)
    ```
    it would even further simplify the code (reader wouldn't have to track one extra level of nesting)
    
    ditto in other places


---

[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6343#discussion_r202982848
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala ---
    @@ -0,0 +1,182 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import org.apache.flink.table.api.{StreamTableEnvironment, ValidationException}
    +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator._
    +import org.apache.flink.table.factories.{StreamTableSinkFactory, StreamTableSourceFactory, TableFactoryService}
    +
    +/**
    +  * Descriptor for specifying a table source and/or sink in a streaming environment.
    +  */
    +class StreamTableDescriptor(
    +    private val tableEnv: StreamTableEnvironment,
    +    private val connectorDescriptor: ConnectorDescriptor)
    +  extends TableDescriptor
    +  with SchematicDescriptor
    +  with RegistrableDescriptor
    +  with StreamableDescriptor {
    +
    +  private var formatDescriptor: Option[FormatDescriptor] = None
    +  private var schemaDescriptor: Option[Schema] = None
    +  private var updateMode: Option[String] = None
    +
    +  /**
    +    * Internal method for properties conversion.
    +    */
    +  override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
    +    connectorDescriptor.addProperties(properties)
    +    formatDescriptor.foreach(_.addProperties(properties))
    +    schemaDescriptor.foreach(_.addProperties(properties))
    +    updateMode.foreach(mode => properties.putString(UPDATE_MODE, mode))
    +  }
    +
    +  /**
    +    * Searches for the specified table source, configures it accordingly, and registers it as
    +    * a table under the given name.
    +    *
    +    * @param name table name to be registered in the table environment
    +    */
    +  override def registerTableSource(name: String): Unit = {
    +    val javaMap = getValidProperties.asMap
    +    val tableSource = TableFactoryService
    +      .find(classOf[StreamTableSourceFactory[_]], javaMap)
    +      .createStreamTableSource(javaMap)
    +    tableEnv.registerTableSource(name, tableSource)
    +  }
    +
    +  /**
    +    * Searches for the specified table sink, configures it accordingly, and registers it as
    +    * a table under the given name.
    +    *
    +    * @param name table name to be registered in the table environment
    +    */
    +  override def registerTableSink(name: String): Unit = {
    +    val javaMap = getValidProperties.asMap
    +    val tableSink = TableFactoryService
    +      .find(classOf[StreamTableSinkFactory[_]], javaMap)
    +      .createStreamTableSink(javaMap)
    +    tableEnv.registerTableSink(name, tableSink)
    +  }
    +
    +  /**
    +    * Searches for the specified table source and sink, configures them accordingly, and registers
    +    * them as a table under the given name.
    +    *
    +    * @param name table name to be registered in the table environment
    +    */
    +  override def registerTableSourceAndSink(name: String): Unit = {
    +    registerTableSource(name)
    +    registerTableSink(name)
    +  }
    +
    +  /**
    +    * Specifies the format that defines how to read data from a connector.
    +    */
    +  override def withFormat(format: FormatDescriptor): StreamTableDescriptor = {
    +    formatDescriptor = Some(format)
    +    this
    +  }
    +
    +  /**
    +    * Specifies the resulting table schema.
    +    */
    +  override def withSchema(schema: Schema): StreamTableDescriptor = {
    +    schemaDescriptor = Some(schema)
    +    this
    +  }
    +
    +  /**
    +    * Declares how to perform the conversion between a dynamic table and an external connector.
    +    *
    +    * In append mode, a dynamic table and an external connector only exchange INSERT messages.
    +    *
    +    * @see See also [[inRetractMode()]] and [[inUpsertMode()]].
    +    */
    +  override def inAppendMode(): StreamTableDescriptor = {
    +    updateMode = Some(UPDATE_MODE_VALUE_APPEND)
    +    this
    +  }
    +
    +  /**
    +    * Declares how to perform the conversion between a dynamic table and an external connector.
    +    *
    +    * In retract mode, a dynamic table and an external connector exchange ADD and RETRACT messages.
    +    *
    +    * An INSERT change is encoded as an ADD message, a DELETE change as a RETRACT message, and an
    +    * UPDATE change as a RETRACT message for the updated (previous) row and an ADD message for
    +    * the updating (new) row.
    +    *
    +    * In this mode, a key must not be defined as opposed to upsert mode. However, every update
    +    * consists of two messages which is less efficient.
    +    *
    +    * @see See also [[inAppendMode()]] and [[inUpsertMode()]].
    +    */
    +  override def inRetractMode(): StreamTableDescriptor = {
    +    updateMode = Some(UPDATE_MODE_VALUE_RETRACT)
    +    this
    +  }
    +
    +  /**
    +    * Declares how to perform the conversion between a dynamic table and an external connector.
    +    *
    +    * In upsert mode, a dynamic table and an external connector exchange UPSERT and DELETE messages.
    +    *
    +    * This mode requires a (possibly composite) unique key by which updates can be propagated. The
    +    * external connector needs to be aware of the unique key attribute in order to apply messages
    +    * correctly. INSERT and UPDATE changes are encoded as UPSERT messages. DELETE changes as
    +    * DELETE messages.
    +    *
    +    * The main difference to a retract stream is that UPDATE changes are encoded with a single
    +    * message and are therefore more efficient.
    +    *
    +    * @see See also [[inAppendMode()]] and [[inRetractMode()]].
    +    */
    +  override def inUpsertMode(): StreamTableDescriptor = {
    +    updateMode = Some(UPDATE_MODE_VALUE_UPSERT)
    +    this
    +  }
    +
    +  override def toString: String = {
    +    getValidProperties.toString
    +  }
    +
    +  // ----------------------------------------------------------------------------------------------
    +
    +  private def getValidProperties: DescriptorProperties = {
    +    val properties = new DescriptorProperties()
    +    addProperties(properties)
    +
    +    // check for a format
    +    if (connectorDescriptor.needsFormat() && formatDescriptor.isEmpty) {
    +      throw new ValidationException(
    +        s"The connector '$connectorDescriptor' requires a format description.")
    +    } else if (!connectorDescriptor.needsFormat() && formatDescriptor.isDefined) {
    +      throw new ValidationException(
    +        s"The connector '$connectorDescriptor' does not require a format description " +
    +          s"but '${formatDescriptor.get}' found.")
    +    }
    +
    +    // basic validation
    --- End diff --
    
    I extended the comment.


---

[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6343#discussion_r203348914
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala ---
    @@ -18,33 +18,299 @@
     
     package org.apache.flink.table.catalog
     
    +import org.apache.flink.table.descriptors.DescriptorProperties.toScala
    +import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats}
    +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT}
     import org.apache.flink.table.descriptors._
     import org.apache.flink.table.plan.stats.TableStats
     
    +import scala.collection.JavaConverters._
    +
     /**
    -  * Defines a table in an [[ExternalCatalog]].
    +  * Defines a table in an [[ExternalCatalog]]. External catalog tables describe table sources
    +  * and/or sinks for both batch and stream environments.
    +  *
    +  * The catalog table takes descriptors which allow for declaring the communication to external
    +  * systems in an implementation-agnostic way. The classpath is scanned for suitable table factories
    +  * that match the desired configuration.
    +  *
    +  * Use the provided builder methods to configure the external catalog table accordingly.
    +  *
    +  * The following example shows how to read from a connector using a JSON format and
    +  * declaring it as a table source:
       *
    -  * @param connectorDesc describes the system to connect to
    -  * @param formatDesc describes the data format of a connector
    -  * @param schemaDesc describes the schema of the result table
    -  * @param statisticsDesc describes the estimated statistics of the result table
    -  * @param metadataDesc describes additional metadata of a table
    +  * {{{
    +  *   ExternalCatalogTable(
    +  *     new ExternalSystemXYZ()
    +  *       .version("0.11"))
    +  *   .withFormat(
    +  *     new Json()
    +  *       .jsonSchema("{...}")
    +  *       .failOnMissingField(false))
    +  *   .withSchema(
    +  *     new Schema()
    +  *       .field("user-name", "VARCHAR").from("u_name")
    +  *       .field("count", "DECIMAL")
    +  *   .asTableSource()
    +  * }}}
    +  *
    +  * Note: For backwards-compatibility, the table is declared as a table source for batch and
    +  * streaming environment by default.
    +  *
    +  * See also [[org.apache.flink.table.factories.TableFactory]] for more information about how
    +  * to target suitable factories.
    +  *
    +  * @param connectorDescriptor describes the system to connect to
       */
    -class ExternalCatalogTable(
    -    connectorDesc: ConnectorDescriptor,
    -    formatDesc: Option[FormatDescriptor],
    -    schemaDesc: Option[Schema],
    -    statisticsDesc: Option[Statistics],
    -    metadataDesc: Option[Metadata])
    -  extends TableSourceDescriptor {
    -
    -  this.connectorDescriptor = Some(connectorDesc)
    -  this.formatDescriptor = formatDesc
    -  this.schemaDescriptor = schemaDesc
    -  this.statisticsDescriptor = statisticsDesc
    -  this.metaDescriptor = metadataDesc
    -
    -  // expose statistics for external table source util
    -  override def getTableStats: Option[TableStats] = super.getTableStats
    +class ExternalCatalogTable(val connectorDescriptor: ConnectorDescriptor)
    +  extends TableDescriptor
    --- End diff --
    
    I would say yes. See also `org.apache.flink.runtime.jobmanager.JobManager` as an example.


---

[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6343#discussion_r202990315
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvAppendTableSinkFactory.scala ---
    @@ -0,0 +1,52 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.sinks
    +
    +import java.util
    +
    +import org.apache.flink.table.descriptors.SchemaValidator.{SCHEMA, SCHEMA_PROCTIME}
    +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND}
    +import org.apache.flink.table.factories.StreamTableSinkFactory
    +import org.apache.flink.types.Row
    +
    +/**
    +  * Factory base for creating configured instances of [[CsvTableSink]] in a stream environment.
    +  */
    +class CsvAppendTableSinkFactory
    --- End diff --
    
    Will put it into a separate commit while merging.


---

[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6343#discussion_r203341010
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala ---
    @@ -0,0 +1,182 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import org.apache.flink.table.api.{StreamTableEnvironment, ValidationException}
    +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator._
    +import org.apache.flink.table.factories.{StreamTableSinkFactory, StreamTableSourceFactory, TableFactoryService}
    +
    +/**
    +  * Descriptor for specifying a table source and/or sink in a streaming environment.
    +  */
    +class StreamTableDescriptor(
    --- End diff --
    
    Inherit:
    ```
    class StreamTableDescriptor(...) extends BatchTableDescriptor with StreamableDescriptor
    ```


---

[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6343#discussion_r203304035
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableDescriptor.scala ---
    @@ -0,0 +1,126 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import org.apache.flink.table.api.{BatchTableEnvironment, ValidationException}
    +import org.apache.flink.table.factories.{BatchTableSinkFactory, BatchTableSourceFactory, TableFactoryService}
    +
    +/**
    +  * Descriptor for specifying a table source and/or sink in a batch environment.
    +  */
    +class BatchTableDescriptor(
    +    private val tableEnv: BatchTableEnvironment,
    +    private val connectorDescriptor: ConnectorDescriptor)
    +  extends TableDescriptor
    +  with SchematicDescriptor
    +  with RegistrableDescriptor {
    +
    +  private var formatDescriptor: Option[FormatDescriptor] = None
    +  private var schemaDescriptor: Option[Schema] = None
    +
    +  /**
    +    * Internal method for properties conversion.
    +    */
    +  override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
    --- End diff --
    
    It's not about public/private :) It's about an order in which you should be reading the code.


---

[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6343#discussion_r203373313
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala ---
    @@ -0,0 +1,148 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.util
    +
    +import org.apache.flink.table.api._
    +import org.apache.flink.table.descriptors.DescriptorProperties
    +import org.apache.flink.table.factories._
    +import org.apache.flink.table.plan.schema._
    +import org.apache.flink.table.plan.stats.FlinkStatistic
    +import org.apache.flink.table.util.Logging
    +
    +
    +/**
    +  * The utility class is used to convert [[ExternalCatalogTable]] to [[TableSourceSinkTable]].
    +  *
    +  * It uses [[TableFactoryService]] for discovering.
    +  */
    +object ExternalTableUtil extends Logging {
    +
    +  /**
    +    * Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance
    +    *
    +    * @param externalCatalogTable the [[ExternalCatalogTable]] instance which to convert
    +    * @return converted [[TableSourceTable]] instance from the input catalog table
    +    */
    +  def fromExternalCatalogTable[T1, T2](
    +      tableEnv: TableEnvironment,
    +      externalCatalogTable: ExternalCatalogTable)
    +    : TableSourceSinkTable[T1, T2] = {
    +
    +    val properties = new DescriptorProperties()
    +    externalCatalogTable.addProperties(properties)
    +    val javaMap = properties.asMap
    +    val statistics = new FlinkStatistic(externalCatalogTable.getTableStats)
    +
    +    val source: Option[TableSourceTable[T1]] = tableEnv match {
    +      // check for a batch table source in this batch environment
    +      case _: BatchTableEnvironment if externalCatalogTable.isBatchTable =>
    +        createBatchTableSource(externalCatalogTable, javaMap, statistics)
    +
    +      // check for a stream table source in this stream environment
    +      case _: StreamTableEnvironment if externalCatalogTable.isStreamTable =>
    +        createStreamTableSource(externalCatalogTable, javaMap, statistics)
    +
    +      case _ =>
    +        throw new ValidationException(
    +          "External catalog table does not support the current environment for a table source.")
    +    }
    +
    +    val sink: Option[TableSinkTable[T2]] = tableEnv match {
    +      // check for a batch table sink in this batch environment
    +      case _: BatchTableEnvironment if externalCatalogTable.isBatchTable =>
    +        createBatchTableSink(externalCatalogTable, javaMap, statistics)
    +
    +      // check for a stream table sink in this stream environment
    +      case _: StreamTableEnvironment if externalCatalogTable.isStreamTable =>
    +        createStreamTableSink(externalCatalogTable, javaMap, statistics)
    +
    +      case _ =>
    +        throw new ValidationException(
    +          "External catalog table does not support the current environment for a table sink.")
    +    }
    +
    +    new TableSourceSinkTable[T1, T2](source, sink)
    +  }
    +
    +  private def createBatchTableSource[T](
    +      externalCatalogTable: ExternalCatalogTable,
    +      javaMap: util.Map[String, String],
    +      statistics: FlinkStatistic)
    +    : Option[TableSourceTable[T]] = if (externalCatalogTable.isTableSource) {
    --- End diff --
    
    I will reverse the branches.


---

[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6343#discussion_r202977464
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala ---
    @@ -18,33 +18,299 @@
     
     package org.apache.flink.table.catalog
     
    +import org.apache.flink.table.descriptors.DescriptorProperties.toScala
    +import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats}
    +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT}
     import org.apache.flink.table.descriptors._
     import org.apache.flink.table.plan.stats.TableStats
     
    +import scala.collection.JavaConverters._
    +
     /**
    -  * Defines a table in an [[ExternalCatalog]].
    +  * Defines a table in an [[ExternalCatalog]]. External catalog tables describe table sources
    +  * and/or sinks for both batch and stream environments.
    +  *
    +  * The catalog table takes descriptors which allow for declaring the communication to external
    +  * systems in an implementation-agnostic way. The classpath is scanned for suitable table factories
    +  * that match the desired configuration.
    +  *
    +  * Use the provided builder methods to configure the external catalog table accordingly.
    +  *
    +  * The following example shows how to read from a connector using a JSON format and
    +  * declaring it as a table source:
       *
    -  * @param connectorDesc describes the system to connect to
    -  * @param formatDesc describes the data format of a connector
    -  * @param schemaDesc describes the schema of the result table
    -  * @param statisticsDesc describes the estimated statistics of the result table
    -  * @param metadataDesc describes additional metadata of a table
    +  * {{{
    +  *   ExternalCatalogTable(
    +  *     new ExternalSystemXYZ()
    +  *       .version("0.11"))
    +  *   .withFormat(
    +  *     new Json()
    +  *       .jsonSchema("{...}")
    +  *       .failOnMissingField(false))
    +  *   .withSchema(
    +  *     new Schema()
    +  *       .field("user-name", "VARCHAR").from("u_name")
    +  *       .field("count", "DECIMAL")
    +  *   .asTableSource()
    +  * }}}
    +  *
    +  * Note: For backwards-compatibility, the table is declared as a table source for batch and
    +  * streaming environment by default.
    +  *
    +  * See also [[org.apache.flink.table.factories.TableFactory]] for more information about how
    +  * to target suitable factories.
    +  *
    +  * @param connectorDescriptor describes the system to connect to
       */
    -class ExternalCatalogTable(
    -    connectorDesc: ConnectorDescriptor,
    -    formatDesc: Option[FormatDescriptor],
    -    schemaDesc: Option[Schema],
    -    statisticsDesc: Option[Statistics],
    -    metadataDesc: Option[Metadata])
    -  extends TableSourceDescriptor {
    -
    -  this.connectorDescriptor = Some(connectorDesc)
    -  this.formatDescriptor = formatDesc
    -  this.schemaDescriptor = schemaDesc
    -  this.statisticsDescriptor = statisticsDesc
    -  this.metaDescriptor = metadataDesc
    -
    -  // expose statistics for external table source util
    -  override def getTableStats: Option[TableStats] = super.getTableStats
    +class ExternalCatalogTable(val connectorDescriptor: ConnectorDescriptor)
    +  extends TableDescriptor
    --- End diff --
    
    https://github.com/databricks/scala-style-guide#indent


---

[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6343#discussion_r202966706
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableDescriptor.scala ---
    @@ -0,0 +1,126 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import org.apache.flink.table.api.{BatchTableEnvironment, ValidationException}
    +import org.apache.flink.table.factories.{BatchTableSinkFactory, BatchTableSourceFactory, TableFactoryService}
    +
    +/**
    +  * Descriptor for specifying a table source and/or sink in a batch environment.
    +  */
    +class BatchTableDescriptor(
    +    private val tableEnv: BatchTableEnvironment,
    +    private val connectorDescriptor: ConnectorDescriptor)
    +  extends TableDescriptor
    +  with SchematicDescriptor
    +  with RegistrableDescriptor {
    +
    +  private var formatDescriptor: Option[FormatDescriptor] = None
    +  private var schemaDescriptor: Option[Schema] = None
    +
    +  /**
    +    * Internal method for properties conversion.
    +    */
    +  override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
    --- End diff --
    
    in many places (for example all `addProperties` methods) you are ordering methods very weirdly. Rule of thumb should be pubic methods before private. Longer story: https://stackoverflow.com/a/1760877/8149051
    
    Many times in this code review I had to jump up & down.


---

[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6343#discussion_r202915534
  
    --- Diff: flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java ---
    @@ -40,10 +41,18 @@
     
     	@Test
     	public void testMerging() throws Exception {
    -		final Environment env1 = EnvironmentFileUtil.parseUnmodified(DEFAULTS_ENVIRONMENT_FILE);
    +		final Map<String, String> replaceVars1 = new HashMap<>();
    +		replaceVars1.put("$VAR_UPDATE_MODE", "update-mode: append");
    +		final Environment env1 = EnvironmentFileUtil.parseModified(
    +			DEFAULTS_ENVIRONMENT_FILE,
    +			replaceVars1);
    +
    +		final Map<String, String> replaceVars2 = new HashMap<>();
    --- End diff --
    
    ```
    final Map<String, String> replaceVars2 = new HashMap<>(replaceVars1);
    ```
    and you can drop the line below


---

[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6343#discussion_r202964715
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchematicDescriptor.scala ---
    @@ -19,14 +19,17 @@
     package org.apache.flink.table.descriptors
     
     /**
    -  * Common class for all descriptors describing a table sink.
    +  * A trait for descriptors that allow to define a format and schema.
       */
    -abstract class TableSinkDescriptor extends TableDescriptor {
    +trait SchematicDescriptor extends Descriptor {
    --- End diff --
    
    are you using it anywhere as interface? what does extracting it to separate interface give us? Maybe drop it?


---

[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6343#discussion_r202923853
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala ---
    @@ -18,33 +18,299 @@
     
     package org.apache.flink.table.catalog
     
    +import org.apache.flink.table.descriptors.DescriptorProperties.toScala
    +import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats}
    +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT}
     import org.apache.flink.table.descriptors._
     import org.apache.flink.table.plan.stats.TableStats
     
    +import scala.collection.JavaConverters._
    +
     /**
    -  * Defines a table in an [[ExternalCatalog]].
    +  * Defines a table in an [[ExternalCatalog]]. External catalog tables describe table sources
    +  * and/or sinks for both batch and stream environments.
    +  *
    +  * The catalog table takes descriptors which allow for declaring the communication to external
    +  * systems in an implementation-agnostic way. The classpath is scanned for suitable table factories
    +  * that match the desired configuration.
    +  *
    +  * Use the provided builder methods to configure the external catalog table accordingly.
    +  *
    +  * The following example shows how to read from a connector using a JSON format and
    +  * declaring it as a table source:
       *
    -  * @param connectorDesc describes the system to connect to
    -  * @param formatDesc describes the data format of a connector
    -  * @param schemaDesc describes the schema of the result table
    -  * @param statisticsDesc describes the estimated statistics of the result table
    -  * @param metadataDesc describes additional metadata of a table
    +  * {{{
    +  *   ExternalCatalogTable(
    +  *     new ExternalSystemXYZ()
    +  *       .version("0.11"))
    +  *   .withFormat(
    +  *     new Json()
    +  *       .jsonSchema("{...}")
    +  *       .failOnMissingField(false))
    +  *   .withSchema(
    +  *     new Schema()
    +  *       .field("user-name", "VARCHAR").from("u_name")
    +  *       .field("count", "DECIMAL")
    +  *   .asTableSource()
    +  * }}}
    +  *
    +  * Note: For backwards-compatibility, the table is declared as a table source for batch and
    +  * streaming environment by default.
    +  *
    +  * See also [[org.apache.flink.table.factories.TableFactory]] for more information about how
    +  * to target suitable factories.
    +  *
    +  * @param connectorDescriptor describes the system to connect to
       */
    -class ExternalCatalogTable(
    -    connectorDesc: ConnectorDescriptor,
    -    formatDesc: Option[FormatDescriptor],
    -    schemaDesc: Option[Schema],
    -    statisticsDesc: Option[Statistics],
    -    metadataDesc: Option[Metadata])
    -  extends TableSourceDescriptor {
    -
    -  this.connectorDescriptor = Some(connectorDesc)
    -  this.formatDescriptor = formatDesc
    -  this.schemaDescriptor = schemaDesc
    -  this.statisticsDescriptor = statisticsDesc
    -  this.metaDescriptor = metadataDesc
    -
    -  // expose statistics for external table source util
    -  override def getTableStats: Option[TableStats] = super.getTableStats
    +class ExternalCatalogTable(val connectorDescriptor: ConnectorDescriptor)
    +  extends TableDescriptor
    --- End diff --
    
    single tab? this is inconsistent with other places


---

[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6343#discussion_r203373202
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala ---
    @@ -18,33 +18,299 @@
     
     package org.apache.flink.table.catalog
     
    +import org.apache.flink.table.descriptors.DescriptorProperties.toScala
    +import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats}
    +import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT}
     import org.apache.flink.table.descriptors._
     import org.apache.flink.table.plan.stats.TableStats
     
    +import scala.collection.JavaConverters._
    +
     /**
    -  * Defines a table in an [[ExternalCatalog]].
    +  * Defines a table in an [[ExternalCatalog]]. External catalog tables describe table sources
    +  * and/or sinks for both batch and stream environments.
    +  *
    +  * The catalog table takes descriptors which allow for declaring the communication to external
    +  * systems in an implementation-agnostic way. The classpath is scanned for suitable table factories
    +  * that match the desired configuration.
    +  *
    +  * Use the provided builder methods to configure the external catalog table accordingly.
    +  *
    +  * The following example shows how to read from a connector using a JSON format and
    +  * declaring it as a table source:
       *
    -  * @param connectorDesc describes the system to connect to
    -  * @param formatDesc describes the data format of a connector
    -  * @param schemaDesc describes the schema of the result table
    -  * @param statisticsDesc describes the estimated statistics of the result table
    -  * @param metadataDesc describes additional metadata of a table
    +  * {{{
    +  *   ExternalCatalogTable(
    +  *     new ExternalSystemXYZ()
    +  *       .version("0.11"))
    +  *   .withFormat(
    +  *     new Json()
    +  *       .jsonSchema("{...}")
    +  *       .failOnMissingField(false))
    +  *   .withSchema(
    +  *     new Schema()
    +  *       .field("user-name", "VARCHAR").from("u_name")
    +  *       .field("count", "DECIMAL")
    +  *   .asTableSource()
    --- End diff --
    
    A descriptor is basically a builder. That builds immutable string properties ones the `addProperties` is called.
    
    "format/schema/statistics/metadata/updateMode" are only concepts that we propose to a user. In the end, a user can implement a custom connector descriptor and paste whatever properties he would like to have and implement a custom factory that match to whatever properties are needed.


---

[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6343#discussion_r202983790
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableDescriptor.scala ---
    @@ -0,0 +1,126 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import org.apache.flink.table.api.{BatchTableEnvironment, ValidationException}
    +import org.apache.flink.table.factories.{BatchTableSinkFactory, BatchTableSourceFactory, TableFactoryService}
    +
    +/**
    +  * Descriptor for specifying a table source and/or sink in a batch environment.
    +  */
    +class BatchTableDescriptor(
    +    private val tableEnv: BatchTableEnvironment,
    +    private val connectorDescriptor: ConnectorDescriptor)
    +  extends TableDescriptor
    +  with SchematicDescriptor
    +  with RegistrableDescriptor {
    +
    +  private var formatDescriptor: Option[FormatDescriptor] = None
    +  private var schemaDescriptor: Option[Schema] = None
    +
    +  /**
    +    * Internal method for properties conversion.
    +    */
    +  override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
    +    connectorDescriptor.addProperties(properties)
    +    formatDescriptor.foreach(_.addProperties(properties))
    +    schemaDescriptor.foreach(_.addProperties(properties))
    +  }
    +
    +  /**
    +    * Searches for the specified table source, configures it accordingly, and registers it as
    +    * a table under the given name.
    +    *
    +    * @param name table name to be registered in the table environment
    +    */
    +  override def registerTableSource(name: String): Unit = {
    +    val javaMap = getValidProperties.asMap
    +    val tableSource = TableFactoryService
    +      .find(classOf[BatchTableSourceFactory[_]], javaMap)
    +      .createBatchTableSource(javaMap)
    +    tableEnv.registerTableSource(name, tableSource)
    +  }
    +
    +  /**
    +    * Searches for the specified table sink, configures it accordingly, and registers it as
    +    * a table under the given name.
    +    *
    +    * @param name table name to be registered in the table environment
    +    */
    +  override def registerTableSink(name: String): Unit = {
    +    val javaMap = getValidProperties.asMap
    +    val tableSink = TableFactoryService
    +      .find(classOf[BatchTableSinkFactory[_]], javaMap)
    +      .createBatchTableSink(javaMap)
    +    tableEnv.registerTableSink(name, tableSink)
    +  }
    +
    +  /**
    +    * Searches for the specified table source and sink, configures them accordingly, and registers
    +    * them as a table under the given name.
    +    *
    +    * @param name table name to be registered in the table environment
    +    */
    +  override def registerTableSourceAndSink(name: String): Unit = {
    +    registerTableSource(name)
    +    registerTableSink(name)
    +  }
    +
    +  /**
    +    * Specifies the format that defines how to read data from a connector.
    +    */
    +  override def withFormat(format: FormatDescriptor): BatchTableDescriptor = {
    +    formatDescriptor = Some(format)
    +    this
    +  }
    +
    +  /**
    +    * Specifies the resulting table schema.
    +    */
    +  override def withSchema(schema: Schema): BatchTableDescriptor = {
    +    schemaDescriptor = Some(schema)
    +    this
    +  }
    +
    +  override def toString: String = {
    +    getValidProperties.toString
    +  }
    +
    +  // ----------------------------------------------------------------------------------------------
    +
    +  private def getValidProperties: DescriptorProperties = {
    --- End diff --
    
    I agree but I cannot move it up the class hierarchy because it would be public in Java. I will create a util class.


---

[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6343#discussion_r202981771
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala ---
    @@ -0,0 +1,148 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.catalog
    +
    +import java.util
    +
    +import org.apache.flink.table.api._
    +import org.apache.flink.table.descriptors.DescriptorProperties
    +import org.apache.flink.table.factories._
    +import org.apache.flink.table.plan.schema._
    +import org.apache.flink.table.plan.stats.FlinkStatistic
    +import org.apache.flink.table.util.Logging
    +
    +
    +/**
    +  * The utility class is used to convert [[ExternalCatalogTable]] to [[TableSourceSinkTable]].
    +  *
    +  * It uses [[TableFactoryService]] for discovering.
    +  */
    +object ExternalTableUtil extends Logging {
    +
    +  /**
    +    * Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance
    +    *
    +    * @param externalCatalogTable the [[ExternalCatalogTable]] instance which to convert
    +    * @return converted [[TableSourceTable]] instance from the input catalog table
    +    */
    +  def fromExternalCatalogTable[T1, T2](
    +      tableEnv: TableEnvironment,
    +      externalCatalogTable: ExternalCatalogTable)
    +    : TableSourceSinkTable[T1, T2] = {
    +
    +    val properties = new DescriptorProperties()
    +    externalCatalogTable.addProperties(properties)
    +    val javaMap = properties.asMap
    +    val statistics = new FlinkStatistic(externalCatalogTable.getTableStats)
    +
    +    val source: Option[TableSourceTable[T1]] = tableEnv match {
    +      // check for a batch table source in this batch environment
    +      case _: BatchTableEnvironment if externalCatalogTable.isBatchTable =>
    +        createBatchTableSource(externalCatalogTable, javaMap, statistics)
    +
    +      // check for a stream table source in this stream environment
    +      case _: StreamTableEnvironment if externalCatalogTable.isStreamTable =>
    +        createStreamTableSource(externalCatalogTable, javaMap, statistics)
    +
    +      case _ =>
    +        throw new ValidationException(
    +          "External catalog table does not support the current environment for a table source.")
    +    }
    +
    +    val sink: Option[TableSinkTable[T2]] = tableEnv match {
    +      // check for a batch table sink in this batch environment
    +      case _: BatchTableEnvironment if externalCatalogTable.isBatchTable =>
    +        createBatchTableSink(externalCatalogTable, javaMap, statistics)
    +
    +      // check for a stream table sink in this stream environment
    +      case _: StreamTableEnvironment if externalCatalogTable.isStreamTable =>
    +        createStreamTableSink(externalCatalogTable, javaMap, statistics)
    +
    +      case _ =>
    +        throw new ValidationException(
    +          "External catalog table does not support the current environment for a table sink.")
    +    }
    +
    +    new TableSourceSinkTable[T1, T2](source, sink)
    +  }
    +
    +  private def createBatchTableSource[T](
    --- End diff --
    
    Then we would have 4 factories that have to be checked with if/else branches. Having those if/else at 3 places (SQL Client, external catalog, and descriptors) is acceptable in my opinion.


---