You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by twalthr <gi...@git.apache.org> on 2018/07/13 06:52:00 UTC

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

GitHub user twalthr opened a pull request:

    https://github.com/apache/flink/pull/6323

    [FLINK-8558] [FLINK-8866] [table] Finalize unified table source/sink/format interfaces

    ## What is the purpose of the change
    
    This PR finalizes the efforts done in #6264 and #6201 for having unified interfaces for table sources, table sinks, and table formats. It reduces code duplication and cleans up the code base around factories.
    
    
    ## Brief change log
    
    - Introduction of `org.apache.table.factories.TableFactory` a common interface for factories
    - Introduction of `org.apache.table.factories.TableFormatFactory` a specific table factory for formats
    - Specific factories for `StreamTableSource`, `StreamTableSink`, `BatchTableSource`, `BatchTableSink`, `DeserializationSchema`, and `SerializationSchema`
    - Deprecation of old format-specific table sources (sinks will be deprecated in a follow-up PR)
    - Possibility to register table source and sink under a common name (table type `both` in SQL Client YAML)
    
    
    ## Verifying this change
    
    - Existing tests verify the implementation
    - Additional ITCases and unit tests have been added
    - (An end-to-end test will follow in a separate PR)
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): no
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes
      - The serializers: no
      - The runtime per-record code paths (performance sensitive): no
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
      - The S3 file system connector: no
    
    ## Documentation
    
      - Does this pull request introduce a new feature? yes
      - If yes, how is the feature documented? not documented


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/twalthr/flink UnifiedInterfacesFinal

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6323.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6323
    
----
commit 980499f887d72ddf9a405c4ad200d0cab15d889c
Author: Timo Walther <tw...@...>
Date:   2018-06-27T11:16:49Z

    [FLINK-8558] [table] Add unified format interfaces and separate formats from connectors
    
    This PR introduces a format discovery mechanism based on Java Service Providers. The general `TableFormatFactory` is similar to the existing table source discovery mechanism. However, it allows for arbirary format interfaces that might be introduced in the future. At the moment, a connector can request configured instances of `DeserializationSchema` and `SerializationSchema`. In the future we can add interfaces such as a `Writer` or `KeyedSerializationSchema` without breaking backwards compatibility.
    
    This PR deprecates the existing strong coupling of connector and format for the Kafa table sources and table source factories. It introduces descriptor-based alternatives.

commit 42a8a156d4e6f8f3d119c458350b6c897306fc48
Author: Shuyi Chen <sh...@...>
Date:   2018-06-19T19:00:34Z

    [FLINK-8866] [table] Create unified interfaces to configure and instatiate TableSinks
    
    This closes #6201.

commit 311dc62e59c0e4146c094b73c21b979f31b2e1d9
Author: Timo Walther <tw...@...>
Date:   2018-07-11T11:29:03Z

    Rename to TableFactory and move it to factories package

commit 1c581cba61ba321bb6de6a4d298a881840d11cfe
Author: Timo Walther <tw...@...>
Date:   2018-07-11T11:46:31Z

    Refactor format factories

commit 5c6df7598d1f1c3c698ae9b6b35eb37d7fff8295
Author: Timo Walther <tw...@...>
Date:   2018-07-12T06:35:00Z

    Unify table factories

commit 0cd7c44c006aba21c32d8785d17bfc3dbee03916
Author: Timo Walther <tw...@...>
Date:   2018-07-12T07:05:50Z

    Move table type out of descriptors

commit 6b83f2e1c0e63147f049dc5389c5633077b789a4
Author: Timo Walther <tw...@...>
Date:   2018-07-12T08:50:09Z

    Make source/sink factories environment-dependent

commit 4f1255fd003080f078afe6ef67ffa58f40ffec36
Author: Timo Walther <tw...@...>
Date:   2018-07-12T18:48:45Z

    Clean up and simplify changes

----


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202282478
  
    --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java ---
    @@ -51,16 +51,10 @@ public void addProperties(DescriptorProperties properties) {
     	}
     
     	public Source toSource() {
    -		final Map<String, String> newProperties = new HashMap<>(properties);
    -		newProperties.replace(TableDescriptorValidator.TABLE_TYPE(),
    -				TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE());
    --- End diff --
    
    why did those lines disappear?


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202352869
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala ---
    @@ -52,12 +52,9 @@ class FlinkLogicalTableSourceScan(
       override def deriveRowType(): RelDataType = {
         val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
         val streamingTable = table.unwrap(classOf[TableSourceSinkTable[_, _]]) match {
    -      case t: TableSourceSinkTable[_, _] => t.tableSourceTableOpt match {
    -        case Some(_: StreamTableSourceTable[_]) => true
    -        case Some(_: BatchTableSourceTable[_]) => false
    -        case _ => throw TableException(s"Unknown Table type ${t.getClass}.")
    -      }
    -      case t => throw TableException(s"Unknown Table type ${t.getClass}.")
    +      case t: TableSourceSinkTable[_, _] if t.isStreamSourceTable => true
    +      // null
    --- End diff --
    
    drop `//null` ?


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202338408
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala ---
    @@ -18,143 +18,358 @@
     
     package org.apache.flink.table.factories
     
    -import java.util.{ServiceConfigurationError, ServiceLoader}
    +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
     
     import org.apache.flink.table.api._
     import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
     import org.apache.flink.table.descriptors.FormatDescriptorValidator._
     import org.apache.flink.table.descriptors.MetadataValidator._
     import org.apache.flink.table.descriptors.StatisticsValidator._
    -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator}
    +import org.apache.flink.table.descriptors._
     import org.apache.flink.table.util.Logging
    +import org.apache.flink.util.Preconditions
     
     import _root_.scala.collection.JavaConverters._
     import _root_.scala.collection.mutable
     
     /**
    -  * Unified interface to search for TableFactoryDiscoverable of provided type and properties.
    +  * Unified interface to search for a [[TableFactory]] of provided type and properties.
       */
     object TableFactoryService extends Logging {
     
       private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory])
     
    -  def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
    -    find(clz, descriptor, null)
    +  /**
    +    * Finds a table factory of the given class and descriptor.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param descriptor descriptor describing the factory configuration
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(descriptor)
    +
    +    val descriptorProperties = new DescriptorProperties()
    +    descriptor.addProperties(descriptorProperties)
    +    findInternal(factoryClass, descriptorProperties.asMap, None)
       }
     
    -  def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader)
    -  : TableFactory = {
    +  /**
    +    * Finds a table factory of the given class, descriptor, and classloader.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param descriptor descriptor describing the factory configuration
    +    * @param classLoader classloader for service loading
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(descriptor)
    +    Preconditions.checkNotNull(classLoader)
     
    -    val properties = new DescriptorProperties()
    -    descriptor.addProperties(properties)
    -    find(clz, properties.asMap.asScala.toMap, classLoader)
    +    val descriptorProperties = new DescriptorProperties()
    +    descriptor.addProperties(descriptorProperties)
    +    findInternal(factoryClass, descriptorProperties.asMap, None)
       }
     
    -  def find(clz: Class[_], properties: Map[String, String]): TableFactory = {
    -    find(clz: Class[_], properties, null)
    +  /**
    +    * Finds a table factory of the given class and property map.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param propertyMap properties that describe the factory configuration
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(propertyMap)
    +
    +    findInternal(factoryClass, propertyMap, None)
       }
     
    -  def find(clz: Class[_], properties: Map[String, String],
    -           classLoader: ClassLoader): TableFactory = {
    +  /**
    +    * Finds a table factory of the given class, property map, and classloader.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param propertyMap properties that describe the factory configuration
    +    * @param classLoader classloader for service loading
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](
    +      factoryClass: Class[T],
    +      propertyMap: JMap[String, String],
    +      classLoader: ClassLoader)
    +    : T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(propertyMap)
    +    Preconditions.checkNotNull(classLoader)
    +
    +    findInternal(factoryClass, propertyMap, Some(classLoader))
    +  }
    +
    +  /**
    +    * Finds a table factory of the given class, property map, and classloader.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param propertyMap properties that describe the factory configuration
    +    * @param classLoader classloader for service loading
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  private def findInternal[T](
    +      factoryClass: Class[T],
    +      propertyMap: JMap[String, String],
    +      classLoader: Option[ClassLoader])
    +    : T = {
    +
    +    val properties = propertyMap.asScala.toMap
    +
    +    // discover table factories
    +    val foundFactories = discoverFactories(classLoader)
     
    -    var matchingFactory: Option[(TableFactory, Seq[String])] = None
    +    // filter by factory class
    +    val classFactories = filterByFactoryClass(
    +      factoryClass,
    +      properties,
    +      foundFactories)
    +
    +    // find matching context
    +    val contextFactories = filterByContext(
    +      factoryClass,
    +      properties,
    +      foundFactories,
    +      classFactories)
    +
    +    // filter by supported keys
    +    filterBySupportedProperties(
    +      factoryClass,
    +      properties,
    +      foundFactories,
    +      contextFactories)
    +  }
    +
    +  /**
    +    * Searches for factories using Java service providers.
    +    *
    +    * @return all factories in the classpath
    +    */
    +  private def discoverFactories[T](classLoader: Option[ClassLoader]): Seq[TableFactory] = {
    +    val foundFactories = mutable.ArrayBuffer[TableFactory]()
         try {
    -      val iter = if (classLoader == null) {
    -        defaultLoader.iterator()
    -      } else {
    -        val customLoader = ServiceLoader.load(classOf[TableFactory], classLoader)
    -        customLoader.iterator()
    +      val iterator = classLoader match {
    +        case Some(customClassLoader) =>
    +          val customLoader = ServiceLoader.load(classOf[TableFactory], customClassLoader)
    +          customLoader.iterator()
    +        case None =>
    +          defaultLoader.iterator()
           }
    -      while (iter.hasNext) {
    -        val factory = iter.next()
    -
    -        if (clz.isAssignableFrom(factory.getClass)) {
    -          val requiredContextJava = try {
    -            factory.requiredContext()
    -          } catch {
    -            case t: Throwable =>
    -              throw new TableException(
    -                s"Table source factory '${factory.getClass.getCanonicalName}' caused an exception.",
    -                t)
    -          }
    -
    -          val requiredContext = if (requiredContextJava != null) {
    -            // normalize properties
    -            requiredContextJava.asScala.map(e => (e._1.toLowerCase, e._2))
    -          } else {
    -            Map[String, String]()
    -          }
    -
    -          val plainContext = mutable.Map[String, String]()
    -          plainContext ++= requiredContext
    -          // we remove the versions for now until we have the first backwards compatibility case
    -          // with the version we can provide mappings in case the format changes
    -          plainContext.remove(CONNECTOR_PROPERTY_VERSION)
    -          plainContext.remove(FORMAT_PROPERTY_VERSION)
    -          plainContext.remove(METADATA_PROPERTY_VERSION)
    -          plainContext.remove(STATISTICS_PROPERTY_VERSION)
    -
    -          if (plainContext.forall(e => properties.contains(e._1) && properties(e._1) == e._2)) {
    -            matchingFactory match {
    -              case Some(_) => throw new AmbiguousTableFactoryException(properties)
    -              case None => matchingFactory =
    -                Some((factory.asInstanceOf[TableFactory], requiredContext.keys.toSeq))
    -            }
    -          }
    -        }
    +
    +      while (iterator.hasNext) {
    +        val factory = iterator.next()
    +        foundFactories += factory
           }
    +
    +      foundFactories
         } catch {
           case e: ServiceConfigurationError =>
             LOG.error("Could not load service provider for table factories.", e)
             throw new TableException("Could not load service provider for table factories.", e)
         }
    +  }
    +
    +  /**
    +    * Filters for factories with matching context.
    +    *
    +    * @return all matching factories
    +    */
    +  private def filterByContext[T](
    +      factoryClass: Class[T],
    +      properties: Map[String, String],
    +      foundFactories: Seq[TableFactory],
    +      classFactories: Seq[TableFactory])
    +    : Seq[TableFactory] = {
    +
    +    val matchingFactories = mutable.ArrayBuffer[TableFactory]()
    +
    +    classFactories.foreach { factory =>
    --- End diff --
    
    ```
    val matchingFactories = classFactories.filter(...)
    ```
    ?


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/6323


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202506574
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala ---
    @@ -44,32 +43,27 @@ object ExternalTableSourceUtil extends Logging {
         val properties = new DescriptorProperties()
         externalCatalogTable.addProperties(properties)
         val javaMap = properties.asMap
    -    val source = TableFactoryService.find(classOf[TableSourceFactory[_]], javaMap)
    -      .asInstanceOf[TableSourceFactory[_]]
    -      .createTableSource(javaMap)
         tableEnv match {
           // check for a batch table source in this batch environment
           case _: BatchTableEnvironment =>
    -        source match {
    -          case bts: BatchTableSource[_] =>
    -            new TableSourceSinkTable(Some(new BatchTableSourceTable(
    -              bts,
    -              new FlinkStatistic(externalCatalogTable.getTableStats))), None)
    -          case _ => throw new TableException(
    -            s"Found table source '${source.getClass.getCanonicalName}' is not applicable " +
    -              s"in a batch environment.")
    -        }
    +        val source = TableFactoryService
    --- End diff --
    
    Usually it is very uncommon to define both a batch and streaming source in the same factory. Your proposed change would require all future sources to implement a check for the environment before which is unnecessary in 80% of the cases. Separating by environment is a concept that can be find throughout the entire `flink-table` module because both sources/sinks behave quite different.


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202506740
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala ---
    @@ -18,143 +18,358 @@
     
     package org.apache.flink.table.factories
     
    -import java.util.{ServiceConfigurationError, ServiceLoader}
    +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
     
     import org.apache.flink.table.api._
     import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
     import org.apache.flink.table.descriptors.FormatDescriptorValidator._
     import org.apache.flink.table.descriptors.MetadataValidator._
     import org.apache.flink.table.descriptors.StatisticsValidator._
    -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator}
    +import org.apache.flink.table.descriptors._
     import org.apache.flink.table.util.Logging
    +import org.apache.flink.util.Preconditions
     
     import _root_.scala.collection.JavaConverters._
     import _root_.scala.collection.mutable
     
     /**
    -  * Unified interface to search for TableFactoryDiscoverable of provided type and properties.
    +  * Unified interface to search for a [[TableFactory]] of provided type and properties.
       */
     object TableFactoryService extends Logging {
     
       private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory])
     
    -  def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
    -    find(clz, descriptor, null)
    +  /**
    +    * Finds a table factory of the given class and descriptor.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param descriptor descriptor describing the factory configuration
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
    +    Preconditions.checkNotNull(factoryClass)
    --- End diff --
    
    The variable is only passed one time. The internal method is not checking for null gain.


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202268969
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedFieldMapping.scala ---
    @@ -50,7 +50,8 @@ trait DefinedFieldMapping {
         * type. It can also provide a mapping for fields which are not in the [[TableSchema]] to make
         * fields in the physical [[TypeInformation]] accessible for a [[TimestampExtractor]].
         *
    -    * @return A mapping from [[TableSchema]] fields to [[TypeInformation]] fields.
    +    * @return A mapping from [[TableSchema]] fields to [[TypeInformation]] fields or
    +    *         null if no mapping is necessary.
         */
       def getFieldMapping: JMap[String, String]
    --- End diff --
    
    annotate `Nullable` or change to `Optional`.


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202332281
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFormatFactory.scala ---
    @@ -16,42 +16,17 @@
      * limitations under the License.
      */
     
    -package org.apache.flink.table.formats
    +package org.apache.flink.table.factories
     
     import java.util
     
    -import org.apache.flink.api.common.serialization.{DeserializationSchema, SerializationSchema}
    -
     /**
    -  * A factory to create different table format instances. This factory is used with Java's Service
    -  * Provider Interfaces (SPI) for discovering. A factory is called with a set of normalized
    -  * properties that describe the desired format. The factory allows for matching to the given set of
    -  * properties. See also [[SerializationSchemaFactory]] and [[DeserializationSchemaFactory]] for
    -  * creating configured instances of format classes accordingly.
    -  *
    -  * Classes that implement this interface need to be added to the
    -  * "META_INF/services/org.apache.flink.table.formats.TableFormatFactory' file of a JAR file in
    -  * the current classpath to be found.
    +  * A factory to create configured table format instances based on string-based properties. See
    +  * also [[TableFactory]] for more information.
       *
       * @tparam T record type that the format produces or consumes
       */
    -trait TableFormatFactory[T] {
    -
    -  /**
    -    * Specifies the context that this factory has been implemented for. The framework guarantees
    -    * to only use the factory if the specified set of properties and values are met.
    -    *
    -    * Typical properties might be:
    -    *   - format.type
    -    *   - format.version
    -    *
    -    * Specified property versions allow the framework to provide backwards compatible properties
    -    * in case of string format changes:
    -    *   - format.property-version
    -    *
    -    * An empty context means that the factory matches for all requests.
    -    */
    -  def requiredContext(): util.Map[String, String]
    +trait TableFormatFactory[T] extends TableFactory {
    --- End diff --
    
    why haven't you removed `supportedProperties` as well?


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202521122
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala ---
    @@ -18,143 +18,358 @@
     
     package org.apache.flink.table.factories
     
    -import java.util.{ServiceConfigurationError, ServiceLoader}
    +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
     
     import org.apache.flink.table.api._
     import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
     import org.apache.flink.table.descriptors.FormatDescriptorValidator._
     import org.apache.flink.table.descriptors.MetadataValidator._
     import org.apache.flink.table.descriptors.StatisticsValidator._
    -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator}
    +import org.apache.flink.table.descriptors._
     import org.apache.flink.table.util.Logging
    +import org.apache.flink.util.Preconditions
     
     import _root_.scala.collection.JavaConverters._
     import _root_.scala.collection.mutable
     
     /**
    -  * Unified interface to search for TableFactoryDiscoverable of provided type and properties.
    +  * Unified interface to search for a [[TableFactory]] of provided type and properties.
       */
     object TableFactoryService extends Logging {
     
       private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory])
     
    -  def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
    -    find(clz, descriptor, null)
    +  /**
    +    * Finds a table factory of the given class and descriptor.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param descriptor descriptor describing the factory configuration
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
    +    Preconditions.checkNotNull(factoryClass)
    --- End diff --
    
    But this leads to quite some unnecessary code duplication. `checkNotNull(factoryClass)` appears 5 times here and same applies to other params. Doing the check only in the place where you are touching value would either solve or at least limit this issue.


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202506625
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFormatFactory.scala ---
    @@ -16,42 +16,17 @@
      * limitations under the License.
      */
     
    -package org.apache.flink.table.formats
    +package org.apache.flink.table.factories
     
     import java.util
     
    -import org.apache.flink.api.common.serialization.{DeserializationSchema, SerializationSchema}
    -
     /**
    -  * A factory to create different table format instances. This factory is used with Java's Service
    -  * Provider Interfaces (SPI) for discovering. A factory is called with a set of normalized
    -  * properties that describe the desired format. The factory allows for matching to the given set of
    -  * properties. See also [[SerializationSchemaFactory]] and [[DeserializationSchemaFactory]] for
    -  * creating configured instances of format classes accordingly.
    -  *
    -  * Classes that implement this interface need to be added to the
    -  * "META_INF/services/org.apache.flink.table.formats.TableFormatFactory' file of a JAR file in
    -  * the current classpath to be found.
    +  * A factory to create configured table format instances based on string-based properties. See
    +  * also [[TableFactory]] for more information.
       *
       * @tparam T record type that the format produces or consumes
       */
    -trait TableFormatFactory[T] {
    -
    -  /**
    -    * Specifies the context that this factory has been implemented for. The framework guarantees
    -    * to only use the factory if the specified set of properties and values are met.
    -    *
    -    * Typical properties might be:
    -    *   - format.type
    -    *   - format.version
    -    *
    -    * Specified property versions allow the framework to provide backwards compatible properties
    -    * in case of string format changes:
    -    *   - format.property-version
    -    *
    -    * An empty context means that the factory matches for all requests.
    -    */
    -  def requiredContext(): util.Map[String, String]
    +trait TableFormatFactory[T] extends TableFactory {
    --- End diff --
    
    Because the Java comment explains the specific situation how supported format properties are handled.


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202274951
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactory.scala ---
    @@ -16,14 +16,14 @@
      * limitations under the License.
      */
     
    -package org.apache.flink.table.connectors
    +package org.apache.flink.table.factories
     
     import java.util
     
     /**
       * Common trait for all properties-based discoverable table factories.
       */
    -trait DiscoverableTableFactory {
    +trait TableFactory {
    --- End diff --
    
    name `TableFactory` is kind of strange. `Table` here is more like package name and `Factory` is generic. I think I liked the previous name better, although it wasn't also very good but it's hard for me to figure out a better one. 


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202351647
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ---
    @@ -465,14 +465,14 @@ abstract class TableEnvironment(val config: TableConfig) {
           tableSink: TableSink[_]): Unit
     
       /**
    -    * Registers an external [[TableSink]] which is already configured in this
    -    * [[TableEnvironment]]'s catalog.
    +    * Registers an external [[TableSink]] with already configured field names and field types in
    --- End diff --
    
    ditto?


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202506861
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala ---
    @@ -18,143 +18,358 @@
     
     package org.apache.flink.table.factories
     
    -import java.util.{ServiceConfigurationError, ServiceLoader}
    +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
     
     import org.apache.flink.table.api._
     import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
     import org.apache.flink.table.descriptors.FormatDescriptorValidator._
     import org.apache.flink.table.descriptors.MetadataValidator._
     import org.apache.flink.table.descriptors.StatisticsValidator._
    -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator}
    +import org.apache.flink.table.descriptors._
     import org.apache.flink.table.util.Logging
    +import org.apache.flink.util.Preconditions
     
     import _root_.scala.collection.JavaConverters._
     import _root_.scala.collection.mutable
     
     /**
    -  * Unified interface to search for TableFactoryDiscoverable of provided type and properties.
    +  * Unified interface to search for a [[TableFactory]] of provided type and properties.
       */
     object TableFactoryService extends Logging {
     
       private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory])
     
    -  def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
    -    find(clz, descriptor, null)
    +  /**
    +    * Finds a table factory of the given class and descriptor.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param descriptor descriptor describing the factory configuration
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(descriptor)
    +
    +    val descriptorProperties = new DescriptorProperties()
    +    descriptor.addProperties(descriptorProperties)
    +    findInternal(factoryClass, descriptorProperties.asMap, None)
       }
     
    -  def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader)
    -  : TableFactory = {
    +  /**
    +    * Finds a table factory of the given class, descriptor, and classloader.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param descriptor descriptor describing the factory configuration
    +    * @param classLoader classloader for service loading
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(descriptor)
    +    Preconditions.checkNotNull(classLoader)
     
    -    val properties = new DescriptorProperties()
    -    descriptor.addProperties(properties)
    -    find(clz, properties.asMap.asScala.toMap, classLoader)
    +    val descriptorProperties = new DescriptorProperties()
    +    descriptor.addProperties(descriptorProperties)
    +    findInternal(factoryClass, descriptorProperties.asMap, None)
       }
     
    -  def find(clz: Class[_], properties: Map[String, String]): TableFactory = {
    -    find(clz: Class[_], properties, null)
    +  /**
    +    * Finds a table factory of the given class and property map.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param propertyMap properties that describe the factory configuration
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(propertyMap)
    +
    +    findInternal(factoryClass, propertyMap, None)
       }
     
    -  def find(clz: Class[_], properties: Map[String, String],
    -           classLoader: ClassLoader): TableFactory = {
    +  /**
    +    * Finds a table factory of the given class, property map, and classloader.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param propertyMap properties that describe the factory configuration
    +    * @param classLoader classloader for service loading
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](
    +      factoryClass: Class[T],
    +      propertyMap: JMap[String, String],
    +      classLoader: ClassLoader)
    +    : T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(propertyMap)
    +    Preconditions.checkNotNull(classLoader)
    +
    +    findInternal(factoryClass, propertyMap, Some(classLoader))
    +  }
    +
    +  /**
    +    * Finds a table factory of the given class, property map, and classloader.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param propertyMap properties that describe the factory configuration
    +    * @param classLoader classloader for service loading
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  private def findInternal[T](
    +      factoryClass: Class[T],
    +      propertyMap: JMap[String, String],
    +      classLoader: Option[ClassLoader])
    +    : T = {
    +
    +    val properties = propertyMap.asScala.toMap
    +
    +    // discover table factories
    +    val foundFactories = discoverFactories(classLoader)
     
    -    var matchingFactory: Option[(TableFactory, Seq[String])] = None
    +    // filter by factory class
    +    val classFactories = filterByFactoryClass(
    +      factoryClass,
    +      properties,
    +      foundFactories)
    +
    +    // find matching context
    +    val contextFactories = filterByContext(
    +      factoryClass,
    +      properties,
    +      foundFactories,
    +      classFactories)
    +
    +    // filter by supported keys
    +    filterBySupportedProperties(
    +      factoryClass,
    +      properties,
    +      foundFactories,
    +      contextFactories)
    +  }
    +
    +  /**
    +    * Searches for factories using Java service providers.
    +    *
    +    * @return all factories in the classpath
    +    */
    +  private def discoverFactories[T](classLoader: Option[ClassLoader]): Seq[TableFactory] = {
    +    val foundFactories = mutable.ArrayBuffer[TableFactory]()
         try {
    -      val iter = if (classLoader == null) {
    -        defaultLoader.iterator()
    -      } else {
    -        val customLoader = ServiceLoader.load(classOf[TableFactory], classLoader)
    -        customLoader.iterator()
    +      val iterator = classLoader match {
    +        case Some(customClassLoader) =>
    +          val customLoader = ServiceLoader.load(classOf[TableFactory], customClassLoader)
    +          customLoader.iterator()
    +        case None =>
    +          defaultLoader.iterator()
           }
    -      while (iter.hasNext) {
    -        val factory = iter.next()
    -
    -        if (clz.isAssignableFrom(factory.getClass)) {
    -          val requiredContextJava = try {
    -            factory.requiredContext()
    -          } catch {
    -            case t: Throwable =>
    -              throw new TableException(
    -                s"Table source factory '${factory.getClass.getCanonicalName}' caused an exception.",
    -                t)
    -          }
    -
    -          val requiredContext = if (requiredContextJava != null) {
    -            // normalize properties
    -            requiredContextJava.asScala.map(e => (e._1.toLowerCase, e._2))
    -          } else {
    -            Map[String, String]()
    -          }
    -
    -          val plainContext = mutable.Map[String, String]()
    -          plainContext ++= requiredContext
    -          // we remove the versions for now until we have the first backwards compatibility case
    -          // with the version we can provide mappings in case the format changes
    -          plainContext.remove(CONNECTOR_PROPERTY_VERSION)
    -          plainContext.remove(FORMAT_PROPERTY_VERSION)
    -          plainContext.remove(METADATA_PROPERTY_VERSION)
    -          plainContext.remove(STATISTICS_PROPERTY_VERSION)
    -
    -          if (plainContext.forall(e => properties.contains(e._1) && properties(e._1) == e._2)) {
    -            matchingFactory match {
    -              case Some(_) => throw new AmbiguousTableFactoryException(properties)
    -              case None => matchingFactory =
    -                Some((factory.asInstanceOf[TableFactory], requiredContext.keys.toSeq))
    -            }
    -          }
    -        }
    +
    +      while (iterator.hasNext) {
    +        val factory = iterator.next()
    +        foundFactories += factory
           }
    +
    +      foundFactories
         } catch {
           case e: ServiceConfigurationError =>
             LOG.error("Could not load service provider for table factories.", e)
             throw new TableException("Could not load service provider for table factories.", e)
         }
    +  }
    +
    +  /**
    +    * Filters for factories with matching context.
    +    *
    +    * @return all matching factories
    +    */
    +  private def filterByContext[T](
    +      factoryClass: Class[T],
    +      properties: Map[String, String],
    +      foundFactories: Seq[TableFactory],
    +      classFactories: Seq[TableFactory])
    +    : Seq[TableFactory] = {
    +
    +    val matchingFactories = mutable.ArrayBuffer[TableFactory]()
    +
    +    classFactories.foreach { factory =>
    +      val requestedContext = normalizeContext(factory)
    +
    +      val plainContext = mutable.Map[String, String]()
    +      plainContext ++= requestedContext
    +      // we remove the version for now until we have the first backwards compatibility case
    +      // with the version we can provide mappings in case the format changes
    --- End diff --
    
    You are right. This could lead to confusion. I will remove these lines entirely.


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202335205
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala ---
    @@ -18,143 +18,358 @@
     
     package org.apache.flink.table.factories
     
    -import java.util.{ServiceConfigurationError, ServiceLoader}
    +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
     
     import org.apache.flink.table.api._
     import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
     import org.apache.flink.table.descriptors.FormatDescriptorValidator._
     import org.apache.flink.table.descriptors.MetadataValidator._
     import org.apache.flink.table.descriptors.StatisticsValidator._
    -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator}
    +import org.apache.flink.table.descriptors._
     import org.apache.flink.table.util.Logging
    +import org.apache.flink.util.Preconditions
     
     import _root_.scala.collection.JavaConverters._
     import _root_.scala.collection.mutable
     
     /**
    -  * Unified interface to search for TableFactoryDiscoverable of provided type and properties.
    +  * Unified interface to search for a [[TableFactory]] of provided type and properties.
       */
     object TableFactoryService extends Logging {
     
       private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory])
     
    -  def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
    -    find(clz, descriptor, null)
    +  /**
    +    * Finds a table factory of the given class and descriptor.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param descriptor descriptor describing the factory configuration
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(descriptor)
    +
    +    val descriptorProperties = new DescriptorProperties()
    +    descriptor.addProperties(descriptorProperties)
    +    findInternal(factoryClass, descriptorProperties.asMap, None)
       }
     
    -  def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader)
    -  : TableFactory = {
    +  /**
    +    * Finds a table factory of the given class, descriptor, and classloader.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param descriptor descriptor describing the factory configuration
    +    * @param classLoader classloader for service loading
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(descriptor)
    +    Preconditions.checkNotNull(classLoader)
     
    -    val properties = new DescriptorProperties()
    -    descriptor.addProperties(properties)
    -    find(clz, properties.asMap.asScala.toMap, classLoader)
    +    val descriptorProperties = new DescriptorProperties()
    +    descriptor.addProperties(descriptorProperties)
    +    findInternal(factoryClass, descriptorProperties.asMap, None)
       }
     
    -  def find(clz: Class[_], properties: Map[String, String]): TableFactory = {
    -    find(clz: Class[_], properties, null)
    +  /**
    +    * Finds a table factory of the given class and property map.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param propertyMap properties that describe the factory configuration
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(propertyMap)
    +
    +    findInternal(factoryClass, propertyMap, None)
       }
     
    -  def find(clz: Class[_], properties: Map[String, String],
    -           classLoader: ClassLoader): TableFactory = {
    +  /**
    +    * Finds a table factory of the given class, property map, and classloader.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param propertyMap properties that describe the factory configuration
    +    * @param classLoader classloader for service loading
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](
    +      factoryClass: Class[T],
    +      propertyMap: JMap[String, String],
    +      classLoader: ClassLoader)
    +    : T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(propertyMap)
    +    Preconditions.checkNotNull(classLoader)
    +
    +    findInternal(factoryClass, propertyMap, Some(classLoader))
    +  }
    +
    +  /**
    +    * Finds a table factory of the given class, property map, and classloader.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param propertyMap properties that describe the factory configuration
    +    * @param classLoader classloader for service loading
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  private def findInternal[T](
    --- End diff --
    
    Huge +1. This method/class looks so much better now :)


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202334688
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala ---
    @@ -18,143 +18,358 @@
     
     package org.apache.flink.table.factories
     
    -import java.util.{ServiceConfigurationError, ServiceLoader}
    +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
     
     import org.apache.flink.table.api._
     import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
     import org.apache.flink.table.descriptors.FormatDescriptorValidator._
     import org.apache.flink.table.descriptors.MetadataValidator._
     import org.apache.flink.table.descriptors.StatisticsValidator._
    -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator}
    +import org.apache.flink.table.descriptors._
     import org.apache.flink.table.util.Logging
    +import org.apache.flink.util.Preconditions
     
     import _root_.scala.collection.JavaConverters._
     import _root_.scala.collection.mutable
     
     /**
    -  * Unified interface to search for TableFactoryDiscoverable of provided type and properties.
    +  * Unified interface to search for a [[TableFactory]] of provided type and properties.
       */
     object TableFactoryService extends Logging {
     
       private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory])
     
    -  def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
    -    find(clz, descriptor, null)
    +  /**
    +    * Finds a table factory of the given class and descriptor.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param descriptor descriptor describing the factory configuration
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(descriptor)
    +
    +    val descriptorProperties = new DescriptorProperties()
    +    descriptor.addProperties(descriptorProperties)
    +    findInternal(factoryClass, descriptorProperties.asMap, None)
       }
     
    -  def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader)
    -  : TableFactory = {
    +  /**
    +    * Finds a table factory of the given class, descriptor, and classloader.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param descriptor descriptor describing the factory configuration
    +    * @param classLoader classloader for service loading
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(descriptor)
    +    Preconditions.checkNotNull(classLoader)
     
    -    val properties = new DescriptorProperties()
    -    descriptor.addProperties(properties)
    -    find(clz, properties.asMap.asScala.toMap, classLoader)
    +    val descriptorProperties = new DescriptorProperties()
    +    descriptor.addProperties(descriptorProperties)
    +    findInternal(factoryClass, descriptorProperties.asMap, None)
       }
     
    -  def find(clz: Class[_], properties: Map[String, String]): TableFactory = {
    -    find(clz: Class[_], properties, null)
    +  /**
    +    * Finds a table factory of the given class and property map.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param propertyMap properties that describe the factory configuration
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(propertyMap)
    +
    +    findInternal(factoryClass, propertyMap, None)
       }
     
    -  def find(clz: Class[_], properties: Map[String, String],
    -           classLoader: ClassLoader): TableFactory = {
    +  /**
    +    * Finds a table factory of the given class, property map, and classloader.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param propertyMap properties that describe the factory configuration
    +    * @param classLoader classloader for service loading
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](
    +      factoryClass: Class[T],
    +      propertyMap: JMap[String, String],
    +      classLoader: ClassLoader)
    +    : T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(propertyMap)
    +    Preconditions.checkNotNull(classLoader)
    +
    +    findInternal(factoryClass, propertyMap, Some(classLoader))
    +  }
    +
    +  /**
    +    * Finds a table factory of the given class, property map, and classloader.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param propertyMap properties that describe the factory configuration
    +    * @param classLoader classloader for service loading
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  private def findInternal[T](
    +      factoryClass: Class[T],
    +      propertyMap: JMap[String, String],
    +      classLoader: Option[ClassLoader])
    +    : T = {
    +
    +    val properties = propertyMap.asScala.toMap
    +
    +    // discover table factories
    --- End diff --
    
    drop the comments?


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202282625
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSinkDescriptor.scala ---
    @@ -22,9 +22,11 @@ package org.apache.flink.table.descriptors
       * Common class for all descriptors describing a table sink.
       */
     abstract class TableSinkDescriptor extends TableDescriptor {
    +
    +  /**
    +    * Internal method for properties conversion.
    +    */
       override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
         super.addProperties(properties)
    -    properties.putString(TableDescriptorValidator.TABLE_TYPE,
    -      TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE)
    --- End diff --
    
    why did those lines disappear?


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202268019
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java ---
    @@ -54,51 +56,105 @@
      * override {@link #createKafkaConsumer(String, Properties, DeserializationSchema)}}.
      */
     @Internal
    -public abstract class KafkaTableSource
    -	implements StreamTableSource<Row>, DefinedProctimeAttribute, DefinedRowtimeAttributes {
    +public abstract class KafkaTableSource implements
    +		StreamTableSource<Row>,
    +		DefinedProctimeAttribute,
    +		DefinedRowtimeAttributes,
    +		DefinedFieldMapping {
    +
    +	// common table source attributes
    +	// TODO make all attributes final once we drop support for format-specific table sources
     
     	/** The schema of the table. */
     	private final TableSchema schema;
     
    +	/** Field name of the processing time attribute, null if no processing time field is defined. */
    +	private String proctimeAttribute;
    +
    +	/** Descriptor for a rowtime attribute. */
    +	private List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors;
    +
    +	/** Mapping for the fields of the table schema to fields of the physical returned type or null. */
    +	private Map<String, String> fieldMapping;
    +
    +	// Kafka-specific attributes
    +
     	/** The Kafka topic to consume. */
     	private final String topic;
     
     	/** Properties for the Kafka consumer. */
     	private final Properties properties;
     
    -	/** Type information describing the result type. */
    -	private TypeInformation<Row> returnType;
    -
    -	/** Field name of the processing time attribute, null if no processing time field is defined. */
    -	private String proctimeAttribute;
    -
    -	/** Descriptor for a rowtime attribute. */
    -	private List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors;
    +	/** Deserialization schema for decoding records from Kafka. */
    +	private final DeserializationSchema<Row> deserializationSchema;
     
     	/** The startup mode for the contained consumer (default is {@link StartupMode#GROUP_OFFSETS}). */
    -	private StartupMode startupMode = StartupMode.GROUP_OFFSETS;
    +	private StartupMode startupMode;
     
     	/** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS}. */
     	private Map<KafkaTopicPartition, Long> specificStartupOffsets;
     
     	/**
     	 * Creates a generic Kafka {@link StreamTableSource}.
     	 *
    -	 * @param topic                 Kafka topic to consume.
    -	 * @param properties            Properties for the Kafka consumer.
    -	 * @param schema                Schema of the produced table.
    -	 * @param returnType            Type information of the produced physical DataStream.
    +	 * @param schema                      Schema of the produced table.
    +	 * @param proctimeAttribute           Field name of the processing time attribute, null if no
    --- End diff --
    
    > null
    
    `Optional` or overload or use builder or disallow null. Same applies to classes that are extending from this one.


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202507057
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala ---
    @@ -52,12 +52,9 @@ class FlinkLogicalTableSourceScan(
       override def deriveRowType(): RelDataType = {
         val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
         val streamingTable = table.unwrap(classOf[TableSourceSinkTable[_, _]]) match {
    -      case t: TableSourceSinkTable[_, _] => t.tableSourceTableOpt match {
    -        case Some(_: StreamTableSourceTable[_]) => true
    -        case Some(_: BatchTableSourceTable[_]) => false
    -        case _ => throw TableException(s"Unknown Table type ${t.getClass}.")
    -      }
    -      case t => throw TableException(s"Unknown Table type ${t.getClass}.")
    +      case t: TableSourceSinkTable[_, _] if t.isStreamSourceTable => true
    +      // null
    --- End diff --
    
    This information is useful.


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202337455
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala ---
    @@ -18,143 +18,358 @@
     
     package org.apache.flink.table.factories
     
    -import java.util.{ServiceConfigurationError, ServiceLoader}
    +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
     
     import org.apache.flink.table.api._
     import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
     import org.apache.flink.table.descriptors.FormatDescriptorValidator._
     import org.apache.flink.table.descriptors.MetadataValidator._
     import org.apache.flink.table.descriptors.StatisticsValidator._
    -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator}
    +import org.apache.flink.table.descriptors._
     import org.apache.flink.table.util.Logging
    +import org.apache.flink.util.Preconditions
     
     import _root_.scala.collection.JavaConverters._
     import _root_.scala.collection.mutable
     
     /**
    -  * Unified interface to search for TableFactoryDiscoverable of provided type and properties.
    +  * Unified interface to search for a [[TableFactory]] of provided type and properties.
       */
     object TableFactoryService extends Logging {
     
       private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory])
     
    -  def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
    -    find(clz, descriptor, null)
    +  /**
    +    * Finds a table factory of the given class and descriptor.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param descriptor descriptor describing the factory configuration
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(descriptor)
    +
    +    val descriptorProperties = new DescriptorProperties()
    +    descriptor.addProperties(descriptorProperties)
    +    findInternal(factoryClass, descriptorProperties.asMap, None)
       }
     
    -  def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader)
    -  : TableFactory = {
    +  /**
    +    * Finds a table factory of the given class, descriptor, and classloader.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param descriptor descriptor describing the factory configuration
    +    * @param classLoader classloader for service loading
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(descriptor)
    +    Preconditions.checkNotNull(classLoader)
     
    -    val properties = new DescriptorProperties()
    -    descriptor.addProperties(properties)
    -    find(clz, properties.asMap.asScala.toMap, classLoader)
    +    val descriptorProperties = new DescriptorProperties()
    +    descriptor.addProperties(descriptorProperties)
    +    findInternal(factoryClass, descriptorProperties.asMap, None)
       }
     
    -  def find(clz: Class[_], properties: Map[String, String]): TableFactory = {
    -    find(clz: Class[_], properties, null)
    +  /**
    +    * Finds a table factory of the given class and property map.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param propertyMap properties that describe the factory configuration
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(propertyMap)
    +
    +    findInternal(factoryClass, propertyMap, None)
       }
     
    -  def find(clz: Class[_], properties: Map[String, String],
    -           classLoader: ClassLoader): TableFactory = {
    +  /**
    +    * Finds a table factory of the given class, property map, and classloader.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param propertyMap properties that describe the factory configuration
    +    * @param classLoader classloader for service loading
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](
    +      factoryClass: Class[T],
    +      propertyMap: JMap[String, String],
    +      classLoader: ClassLoader)
    +    : T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(propertyMap)
    +    Preconditions.checkNotNull(classLoader)
    +
    +    findInternal(factoryClass, propertyMap, Some(classLoader))
    +  }
    +
    +  /**
    +    * Finds a table factory of the given class, property map, and classloader.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param propertyMap properties that describe the factory configuration
    +    * @param classLoader classloader for service loading
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  private def findInternal[T](
    +      factoryClass: Class[T],
    +      propertyMap: JMap[String, String],
    +      classLoader: Option[ClassLoader])
    +    : T = {
    +
    +    val properties = propertyMap.asScala.toMap
    +
    +    // discover table factories
    +    val foundFactories = discoverFactories(classLoader)
     
    -    var matchingFactory: Option[(TableFactory, Seq[String])] = None
    +    // filter by factory class
    +    val classFactories = filterByFactoryClass(
    +      factoryClass,
    +      properties,
    +      foundFactories)
    +
    +    // find matching context
    +    val contextFactories = filterByContext(
    +      factoryClass,
    +      properties,
    +      foundFactories,
    +      classFactories)
    +
    +    // filter by supported keys
    +    filterBySupportedProperties(
    +      factoryClass,
    +      properties,
    +      foundFactories,
    +      contextFactories)
    +  }
    +
    +  /**
    +    * Searches for factories using Java service providers.
    +    *
    +    * @return all factories in the classpath
    +    */
    +  private def discoverFactories[T](classLoader: Option[ClassLoader]): Seq[TableFactory] = {
    +    val foundFactories = mutable.ArrayBuffer[TableFactory]()
         try {
    -      val iter = if (classLoader == null) {
    -        defaultLoader.iterator()
    -      } else {
    -        val customLoader = ServiceLoader.load(classOf[TableFactory], classLoader)
    -        customLoader.iterator()
    +      val iterator = classLoader match {
    +        case Some(customClassLoader) =>
    +          val customLoader = ServiceLoader.load(classOf[TableFactory], customClassLoader)
    +          customLoader.iterator()
    +        case None =>
    +          defaultLoader.iterator()
           }
    -      while (iter.hasNext) {
    -        val factory = iter.next()
    -
    -        if (clz.isAssignableFrom(factory.getClass)) {
    -          val requiredContextJava = try {
    -            factory.requiredContext()
    -          } catch {
    -            case t: Throwable =>
    -              throw new TableException(
    -                s"Table source factory '${factory.getClass.getCanonicalName}' caused an exception.",
    -                t)
    -          }
    -
    -          val requiredContext = if (requiredContextJava != null) {
    -            // normalize properties
    -            requiredContextJava.asScala.map(e => (e._1.toLowerCase, e._2))
    -          } else {
    -            Map[String, String]()
    -          }
    -
    -          val plainContext = mutable.Map[String, String]()
    -          plainContext ++= requiredContext
    -          // we remove the versions for now until we have the first backwards compatibility case
    -          // with the version we can provide mappings in case the format changes
    -          plainContext.remove(CONNECTOR_PROPERTY_VERSION)
    -          plainContext.remove(FORMAT_PROPERTY_VERSION)
    -          plainContext.remove(METADATA_PROPERTY_VERSION)
    -          plainContext.remove(STATISTICS_PROPERTY_VERSION)
    -
    -          if (plainContext.forall(e => properties.contains(e._1) && properties(e._1) == e._2)) {
    -            matchingFactory match {
    -              case Some(_) => throw new AmbiguousTableFactoryException(properties)
    -              case None => matchingFactory =
    -                Some((factory.asInstanceOf[TableFactory], requiredContext.keys.toSeq))
    -            }
    -          }
    -        }
    +
    +      while (iterator.hasNext) {
    --- End diff --
    
    is there no one liner way to convert iterator to seq? Does this method need to work on `Seq`? 


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202268054
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java ---
    @@ -54,51 +56,105 @@
      * override {@link #createKafkaConsumer(String, Properties, DeserializationSchema)}}.
      */
     @Internal
    -public abstract class KafkaTableSource
    -	implements StreamTableSource<Row>, DefinedProctimeAttribute, DefinedRowtimeAttributes {
    +public abstract class KafkaTableSource implements
    +		StreamTableSource<Row>,
    +		DefinedProctimeAttribute,
    +		DefinedRowtimeAttributes,
    +		DefinedFieldMapping {
    +
    +	// common table source attributes
    +	// TODO make all attributes final once we drop support for format-specific table sources
     
     	/** The schema of the table. */
     	private final TableSchema schema;
     
    +	/** Field name of the processing time attribute, null if no processing time field is defined. */
    +	private String proctimeAttribute;
    +
    +	/** Descriptor for a rowtime attribute. */
    +	private List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors;
    +
    +	/** Mapping for the fields of the table schema to fields of the physical returned type or null. */
    +	private Map<String, String> fieldMapping;
    +
    +	// Kafka-specific attributes
    +
     	/** The Kafka topic to consume. */
     	private final String topic;
     
     	/** Properties for the Kafka consumer. */
     	private final Properties properties;
     
    -	/** Type information describing the result type. */
    -	private TypeInformation<Row> returnType;
    -
    -	/** Field name of the processing time attribute, null if no processing time field is defined. */
    -	private String proctimeAttribute;
    -
    -	/** Descriptor for a rowtime attribute. */
    -	private List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors;
    +	/** Deserialization schema for decoding records from Kafka. */
    +	private final DeserializationSchema<Row> deserializationSchema;
     
     	/** The startup mode for the contained consumer (default is {@link StartupMode#GROUP_OFFSETS}). */
    -	private StartupMode startupMode = StartupMode.GROUP_OFFSETS;
    +	private StartupMode startupMode;
     
     	/** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS}. */
     	private Map<KafkaTopicPartition, Long> specificStartupOffsets;
     
     	/**
     	 * Creates a generic Kafka {@link StreamTableSource}.
     	 *
    -	 * @param topic                 Kafka topic to consume.
    -	 * @param properties            Properties for the Kafka consumer.
    -	 * @param schema                Schema of the produced table.
    -	 * @param returnType            Type information of the produced physical DataStream.
    +	 * @param schema                      Schema of the produced table.
    +	 * @param proctimeAttribute           Field name of the processing time attribute, null if no
    +	 *                                    processing time field is defined.
    +	 * @param rowtimeAttributeDescriptors Descriptor for a rowtime attribute
    +	 * @param fieldMapping                Mapping for the fields of the table schema to
    --- End diff --
    
    >or null
    
    ditto


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202333467
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala ---
    @@ -143,118 +143,82 @@ case class CatalogAlreadyExistException(
     }
     
     /**
    -  * Exception for not finding a [[TableFormatFactory]] for the
    -  * given properties.
    +  * Exception for not finding a [[TableFactory]] for the given properties.
       *
       * @param message message that indicates the current matching step
       * @param factoryClass required factory class
    -  * @param formatFactories all found factories
    -  * @param properties properties that describe the table format
    +  * @param factories all found factories
    +  * @param properties properties that describe the configuration
       * @param cause the cause
       */
    -case class NoMatchingTableFormatException(
    +case class NoMatchingTableFactoryException(
           message: String,
           factoryClass: Class[_],
    -      formatFactories: Seq[TableFormatFactory[_]],
    +      factories: Seq[TableFactory],
           properties: Map[String, String],
           cause: Throwable)
         extends RuntimeException(
    --- End diff --
    
    side note: shouldn't our exception inherit from `FlinkTableException` or sth like that?


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202354777
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceSinkTable.scala ---
    @@ -21,23 +21,47 @@ package org.apache.flink.table.plan.schema
     import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
     import org.apache.calcite.schema.Statistic
     import org.apache.calcite.schema.impl.AbstractTable
    +import org.apache.flink.table.api.TableException
     
    -class TableSourceSinkTable[T1, T2](val tableSourceTableOpt: Option[TableSourceTable[T1]],
    -                                   val tableSinkTableOpt: Option[TableSinkTable[T2]])
    +/**
    --- End diff --
    
    ditto: fixup Shuyi's commit 


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202521212
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala ---
    @@ -18,143 +18,358 @@
     
     package org.apache.flink.table.factories
     
    -import java.util.{ServiceConfigurationError, ServiceLoader}
    +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
     
     import org.apache.flink.table.api._
     import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
     import org.apache.flink.table.descriptors.FormatDescriptorValidator._
     import org.apache.flink.table.descriptors.MetadataValidator._
     import org.apache.flink.table.descriptors.StatisticsValidator._
    -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator}
    +import org.apache.flink.table.descriptors._
     import org.apache.flink.table.util.Logging
    +import org.apache.flink.util.Preconditions
     
     import _root_.scala.collection.JavaConverters._
     import _root_.scala.collection.mutable
     
     /**
    -  * Unified interface to search for TableFactoryDiscoverable of provided type and properties.
    +  * Unified interface to search for a [[TableFactory]] of provided type and properties.
       */
     object TableFactoryService extends Logging {
     
       private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory])
     
    -  def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
    -    find(clz, descriptor, null)
    +  /**
    +    * Finds a table factory of the given class and descriptor.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param descriptor descriptor describing the factory configuration
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(descriptor)
    +
    +    val descriptorProperties = new DescriptorProperties()
    +    descriptor.addProperties(descriptorProperties)
    +    findInternal(factoryClass, descriptorProperties.asMap, None)
       }
     
    -  def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader)
    -  : TableFactory = {
    +  /**
    +    * Finds a table factory of the given class, descriptor, and classloader.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param descriptor descriptor describing the factory configuration
    +    * @param classLoader classloader for service loading
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(descriptor)
    +    Preconditions.checkNotNull(classLoader)
     
    -    val properties = new DescriptorProperties()
    -    descriptor.addProperties(properties)
    -    find(clz, properties.asMap.asScala.toMap, classLoader)
    +    val descriptorProperties = new DescriptorProperties()
    +    descriptor.addProperties(descriptorProperties)
    +    findInternal(factoryClass, descriptorProperties.asMap, None)
       }
     
    -  def find(clz: Class[_], properties: Map[String, String]): TableFactory = {
    -    find(clz: Class[_], properties, null)
    +  /**
    +    * Finds a table factory of the given class and property map.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param propertyMap properties that describe the factory configuration
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(propertyMap)
    +
    +    findInternal(factoryClass, propertyMap, None)
       }
     
    -  def find(clz: Class[_], properties: Map[String, String],
    -           classLoader: ClassLoader): TableFactory = {
    +  /**
    +    * Finds a table factory of the given class, property map, and classloader.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param propertyMap properties that describe the factory configuration
    +    * @param classLoader classloader for service loading
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](
    +      factoryClass: Class[T],
    +      propertyMap: JMap[String, String],
    +      classLoader: ClassLoader)
    +    : T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(propertyMap)
    +    Preconditions.checkNotNull(classLoader)
    +
    +    findInternal(factoryClass, propertyMap, Some(classLoader))
    +  }
    +
    +  /**
    +    * Finds a table factory of the given class, property map, and classloader.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param propertyMap properties that describe the factory configuration
    +    * @param classLoader classloader for service loading
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  private def findInternal[T](
    +      factoryClass: Class[T],
    +      propertyMap: JMap[String, String],
    +      classLoader: Option[ClassLoader])
    +    : T = {
    +
    +    val properties = propertyMap.asScala.toMap
    +
    +    // discover table factories
    +    val foundFactories = discoverFactories(classLoader)
     
    -    var matchingFactory: Option[(TableFactory, Seq[String])] = None
    +    // filter by factory class
    +    val classFactories = filterByFactoryClass(
    +      factoryClass,
    +      properties,
    +      foundFactories)
    +
    +    // find matching context
    +    val contextFactories = filterByContext(
    +      factoryClass,
    +      properties,
    +      foundFactories,
    +      classFactories)
    +
    +    // filter by supported keys
    +    filterBySupportedProperties(
    +      factoryClass,
    +      properties,
    +      foundFactories,
    +      contextFactories)
    +  }
    +
    +  /**
    +    * Searches for factories using Java service providers.
    +    *
    +    * @return all factories in the classpath
    +    */
    +  private def discoverFactories[T](classLoader: Option[ClassLoader]): Seq[TableFactory] = {
    +    val foundFactories = mutable.ArrayBuffer[TableFactory]()
         try {
    -      val iter = if (classLoader == null) {
    -        defaultLoader.iterator()
    -      } else {
    -        val customLoader = ServiceLoader.load(classOf[TableFactory], classLoader)
    -        customLoader.iterator()
    +      val iterator = classLoader match {
    +        case Some(customClassLoader) =>
    +          val customLoader = ServiceLoader.load(classOf[TableFactory], customClassLoader)
    +          customLoader.iterator()
    +        case None =>
    +          defaultLoader.iterator()
           }
    -      while (iter.hasNext) {
    -        val factory = iter.next()
    -
    -        if (clz.isAssignableFrom(factory.getClass)) {
    -          val requiredContextJava = try {
    -            factory.requiredContext()
    -          } catch {
    -            case t: Throwable =>
    -              throw new TableException(
    -                s"Table source factory '${factory.getClass.getCanonicalName}' caused an exception.",
    -                t)
    -          }
    -
    -          val requiredContext = if (requiredContextJava != null) {
    -            // normalize properties
    -            requiredContextJava.asScala.map(e => (e._1.toLowerCase, e._2))
    -          } else {
    -            Map[String, String]()
    -          }
    -
    -          val plainContext = mutable.Map[String, String]()
    -          plainContext ++= requiredContext
    -          // we remove the versions for now until we have the first backwards compatibility case
    -          // with the version we can provide mappings in case the format changes
    -          plainContext.remove(CONNECTOR_PROPERTY_VERSION)
    -          plainContext.remove(FORMAT_PROPERTY_VERSION)
    -          plainContext.remove(METADATA_PROPERTY_VERSION)
    -          plainContext.remove(STATISTICS_PROPERTY_VERSION)
    -
    -          if (plainContext.forall(e => properties.contains(e._1) && properties(e._1) == e._2)) {
    -            matchingFactory match {
    -              case Some(_) => throw new AmbiguousTableFactoryException(properties)
    -              case None => matchingFactory =
    -                Some((factory.asInstanceOf[TableFactory], requiredContext.keys.toSeq))
    -            }
    -          }
    -        }
    +
    +      while (iterator.hasNext) {
    +        val factory = iterator.next()
    +        foundFactories += factory
           }
    +
    +      foundFactories
         } catch {
           case e: ServiceConfigurationError =>
             LOG.error("Could not load service provider for table factories.", e)
             throw new TableException("Could not load service provider for table factories.", e)
         }
    +  }
    +
    +  /**
    +    * Filters for factories with matching context.
    +    *
    +    * @return all matching factories
    +    */
    +  private def filterByContext[T](
    +      factoryClass: Class[T],
    +      properties: Map[String, String],
    +      foundFactories: Seq[TableFactory],
    +      classFactories: Seq[TableFactory])
    +    : Seq[TableFactory] = {
    +
    +    val matchingFactories = mutable.ArrayBuffer[TableFactory]()
    +
    +    classFactories.foreach { factory =>
    +      val requestedContext = normalizeContext(factory)
    +
    +      val plainContext = mutable.Map[String, String]()
    +      plainContext ++= requestedContext
    +      // we remove the version for now until we have the first backwards compatibility case
    +      // with the version we can provide mappings in case the format changes
    --- End diff --
    
    Can you create a follow up issue with blocker status for `1.6.0` so that it doesn't slip?


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202520938
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactory.scala ---
    @@ -16,14 +16,14 @@
      * limitations under the License.
      */
     
    -package org.apache.flink.table.connectors
    +package org.apache.flink.table.factories
     
     import java.util
     
     /**
       * Common trait for all properties-based discoverable table factories.
       */
    -trait DiscoverableTableFactory {
    +trait TableFactory {
    --- End diff --
    
    I am not happy but I can not think about something better :( `Factory` is way to generic. It implies that this is THE Factory, THE One Factory To Rule Them All and that's not true. There are plenty of other factories in `flink-table`. `Table` prefix as you confirmed is just a package prefix, thus doesn't improve anything :(


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202521065
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFormatFactory.scala ---
    @@ -16,42 +16,17 @@
      * limitations under the License.
      */
     
    -package org.apache.flink.table.formats
    +package org.apache.flink.table.factories
     
     import java.util
     
    -import org.apache.flink.api.common.serialization.{DeserializationSchema, SerializationSchema}
    -
     /**
    -  * A factory to create different table format instances. This factory is used with Java's Service
    -  * Provider Interfaces (SPI) for discovering. A factory is called with a set of normalized
    -  * properties that describe the desired format. The factory allows for matching to the given set of
    -  * properties. See also [[SerializationSchemaFactory]] and [[DeserializationSchemaFactory]] for
    -  * creating configured instances of format classes accordingly.
    -  *
    -  * Classes that implement this interface need to be added to the
    -  * "META_INF/services/org.apache.flink.table.formats.TableFormatFactory' file of a JAR file in
    -  * the current classpath to be found.
    +  * A factory to create configured table format instances based on string-based properties. See
    +  * also [[TableFactory]] for more information.
       *
       * @tparam T record type that the format produces or consumes
       */
    -trait TableFormatFactory[T] {
    -
    -  /**
    -    * Specifies the context that this factory has been implemented for. The framework guarantees
    -    * to only use the factory if the specified set of properties and values are met.
    -    *
    -    * Typical properties might be:
    -    *   - format.type
    -    *   - format.version
    -    *
    -    * Specified property versions allow the framework to provide backwards compatible properties
    -    * in case of string format changes:
    -    *   - format.property-version
    -    *
    -    * An empty context means that the factory matches for all requests.
    -    */
    -  def requiredContext(): util.Map[String, String]
    +trait TableFormatFactory[T] extends TableFactory {
    --- End diff --
    
    Maybe in that case deduplicate comment with `@See` java doc pointer? Otherwise there is huge risk of comments drifting out of sink.


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202281623
  
    --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java ---
    @@ -214,15 +216,18 @@ private static TableDescriptor createTableDescriptor(String name, Map<String, Ob
     		if (typeObject == null || !(typeObject instanceof String)) {
     			throw new SqlClientException("Invalid 'type' attribute for table '" + name + "'.");
     		}
    -		final String type = (String) config.get(TABLE_TYPE);
    +		final String type = (String) typeObject;
    +		final Map<String, Object> properties = new HashMap<>(config);
    --- End diff --
    
    copy of `config` named as `properties`? this is confusing. Rename `config` to `properties` and inline this copy? Or inline `properties` and rename `normalizedProperties` back to `normalizedConfig`


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202506906
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala ---
    @@ -18,143 +18,358 @@
     
     package org.apache.flink.table.factories
     
    -import java.util.{ServiceConfigurationError, ServiceLoader}
    +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
     
     import org.apache.flink.table.api._
     import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
     import org.apache.flink.table.descriptors.FormatDescriptorValidator._
     import org.apache.flink.table.descriptors.MetadataValidator._
     import org.apache.flink.table.descriptors.StatisticsValidator._
    -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator}
    +import org.apache.flink.table.descriptors._
     import org.apache.flink.table.util.Logging
    +import org.apache.flink.util.Preconditions
     
     import _root_.scala.collection.JavaConverters._
     import _root_.scala.collection.mutable
     
     /**
    -  * Unified interface to search for TableFactoryDiscoverable of provided type and properties.
    +  * Unified interface to search for a [[TableFactory]] of provided type and properties.
       */
     object TableFactoryService extends Logging {
     
       private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory])
     
    -  def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
    -    find(clz, descriptor, null)
    +  /**
    +    * Finds a table factory of the given class and descriptor.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param descriptor descriptor describing the factory configuration
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(descriptor)
    +
    +    val descriptorProperties = new DescriptorProperties()
    +    descriptor.addProperties(descriptorProperties)
    +    findInternal(factoryClass, descriptorProperties.asMap, None)
       }
     
    -  def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader)
    -  : TableFactory = {
    +  /**
    +    * Finds a table factory of the given class, descriptor, and classloader.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param descriptor descriptor describing the factory configuration
    +    * @param classLoader classloader for service loading
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(descriptor)
    +    Preconditions.checkNotNull(classLoader)
     
    -    val properties = new DescriptorProperties()
    -    descriptor.addProperties(properties)
    -    find(clz, properties.asMap.asScala.toMap, classLoader)
    +    val descriptorProperties = new DescriptorProperties()
    +    descriptor.addProperties(descriptorProperties)
    +    findInternal(factoryClass, descriptorProperties.asMap, None)
       }
     
    -  def find(clz: Class[_], properties: Map[String, String]): TableFactory = {
    -    find(clz: Class[_], properties, null)
    +  /**
    +    * Finds a table factory of the given class and property map.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param propertyMap properties that describe the factory configuration
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(propertyMap)
    +
    +    findInternal(factoryClass, propertyMap, None)
       }
     
    -  def find(clz: Class[_], properties: Map[String, String],
    -           classLoader: ClassLoader): TableFactory = {
    +  /**
    +    * Finds a table factory of the given class, property map, and classloader.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param propertyMap properties that describe the factory configuration
    +    * @param classLoader classloader for service loading
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](
    +      factoryClass: Class[T],
    +      propertyMap: JMap[String, String],
    +      classLoader: ClassLoader)
    +    : T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(propertyMap)
    +    Preconditions.checkNotNull(classLoader)
    +
    +    findInternal(factoryClass, propertyMap, Some(classLoader))
    +  }
    +
    +  /**
    +    * Finds a table factory of the given class, property map, and classloader.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param propertyMap properties that describe the factory configuration
    +    * @param classLoader classloader for service loading
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  private def findInternal[T](
    +      factoryClass: Class[T],
    +      propertyMap: JMap[String, String],
    +      classLoader: Option[ClassLoader])
    +    : T = {
    +
    +    val properties = propertyMap.asScala.toMap
    +
    +    // discover table factories
    +    val foundFactories = discoverFactories(classLoader)
     
    -    var matchingFactory: Option[(TableFactory, Seq[String])] = None
    +    // filter by factory class
    +    val classFactories = filterByFactoryClass(
    +      factoryClass,
    +      properties,
    +      foundFactories)
    +
    +    // find matching context
    +    val contextFactories = filterByContext(
    +      factoryClass,
    +      properties,
    +      foundFactories,
    +      classFactories)
    +
    +    // filter by supported keys
    +    filterBySupportedProperties(
    +      factoryClass,
    +      properties,
    +      foundFactories,
    +      contextFactories)
    +  }
    +
    +  /**
    +    * Searches for factories using Java service providers.
    +    *
    +    * @return all factories in the classpath
    +    */
    +  private def discoverFactories[T](classLoader: Option[ClassLoader]): Seq[TableFactory] = {
    +    val foundFactories = mutable.ArrayBuffer[TableFactory]()
         try {
    -      val iter = if (classLoader == null) {
    -        defaultLoader.iterator()
    -      } else {
    -        val customLoader = ServiceLoader.load(classOf[TableFactory], classLoader)
    -        customLoader.iterator()
    +      val iterator = classLoader match {
    +        case Some(customClassLoader) =>
    +          val customLoader = ServiceLoader.load(classOf[TableFactory], customClassLoader)
    +          customLoader.iterator()
    +        case None =>
    +          defaultLoader.iterator()
           }
    -      while (iter.hasNext) {
    -        val factory = iter.next()
    -
    -        if (clz.isAssignableFrom(factory.getClass)) {
    -          val requiredContextJava = try {
    -            factory.requiredContext()
    -          } catch {
    -            case t: Throwable =>
    -              throw new TableException(
    -                s"Table source factory '${factory.getClass.getCanonicalName}' caused an exception.",
    -                t)
    -          }
    -
    -          val requiredContext = if (requiredContextJava != null) {
    -            // normalize properties
    -            requiredContextJava.asScala.map(e => (e._1.toLowerCase, e._2))
    -          } else {
    -            Map[String, String]()
    -          }
    -
    -          val plainContext = mutable.Map[String, String]()
    -          plainContext ++= requiredContext
    -          // we remove the versions for now until we have the first backwards compatibility case
    -          // with the version we can provide mappings in case the format changes
    -          plainContext.remove(CONNECTOR_PROPERTY_VERSION)
    -          plainContext.remove(FORMAT_PROPERTY_VERSION)
    -          plainContext.remove(METADATA_PROPERTY_VERSION)
    -          plainContext.remove(STATISTICS_PROPERTY_VERSION)
    -
    -          if (plainContext.forall(e => properties.contains(e._1) && properties(e._1) == e._2)) {
    -            matchingFactory match {
    -              case Some(_) => throw new AmbiguousTableFactoryException(properties)
    -              case None => matchingFactory =
    -                Some((factory.asInstanceOf[TableFactory], requiredContext.keys.toSeq))
    -            }
    -          }
    -        }
    +
    +      while (iterator.hasNext) {
    --- End diff --
    
    +1 a `iterator.asScala.toSeq` does the job


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202350406
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---
    @@ -114,6 +114,8 @@ abstract class StreamTableEnvironment(
         : Unit = {
     
         tableSource match {
    +
    +      // check for proper stream table source
    --- End diff --
    
    ditto?


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202356170
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactory.scala ---
    @@ -104,17 +100,12 @@ class CsvTableSinkFactory
             "Encodings that differ from the schema are not supported yet for CsvTableSink.")
         }
     
    -    toScala(params.getOptionalString(CONNECTOR_PATH))
    -      .foreach(csvTableSinkBuilder.path)
    -    toScala(params.getOptionalInt(NUM_FILES))
    -      .foreach(n => csvTableSinkBuilder.numFiles(n))
    -    toScala(params.getOptionalString(WRITE_MODE))
    -      .foreach(csvTableSinkBuilder.writeMode)
    -    toScala(params.getOptionalString(FORMAT_FIELD_DELIMITER))
    -      .foreach(csvTableSinkBuilder.fieldDelimiter)
    -
    -    csvTableSinkBuilder
    -      .build()
    +    val path = params.getString(CONNECTOR_PATH)
    --- End diff --
    
    ditto fixup


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202271871
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java ---
    @@ -124,110 +123,127 @@
     		params.putProperties(properties);
     
     		// validate
    -		new SchemaValidator(true).validate(params);
    +		// allow Kafka timestamps to be used, watermarks can not be received from source
    +		new SchemaValidator(true, supportsKafkaTimestamps(), false).validate(params);
     		new KafkaValidator().validate(params);
    -		formatValidator().validate(params);
     
    -		// build
    -		final KafkaTableSource.Builder builder = createBuilderWithFormat(params);
    +		// deserialization schema using format discovery
    +		final DeserializationSchemaFactory<?> formatFactory = TableFormatFactoryService.find(
    +			DeserializationSchemaFactory.class,
    +			properties,
    +			this.getClass().getClassLoader());
    +		@SuppressWarnings("unchecked")
    +		final DeserializationSchema<Row> deserializationSchema = (DeserializationSchema<Row>) formatFactory
    +			.createDeserializationSchema(properties);
     
    -		// topic
    -		final String topic = params.getString(CONNECTOR_TOPIC);
    -		builder.forTopic(topic);
    +		// schema
    +		final TableSchema schema = params.getTableSchema(SCHEMA());
    +
    +		// proctime
    +		final String proctimeAttribute = SchemaValidator.deriveProctimeAttribute(params).orElse(null);
    --- End diff --
    
    this is kind of ridiculous :/ Deep inside we work on `Optional` and then we switch to null...
    
    Please drag this `Optional` until the very end and do this `orElse(null)` conversion in `org.apache.flink.table.sources.DefinedProctimeAttribute#getProctimeAttribute` implementation.


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202347281
  
    --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java ---
    @@ -329,18 +329,6 @@ public void stop(SessionContext session) {
     		}
     	}
     
    -	private <T> void executeUpdateInternal(ExecutionContext<T> context, String query) {
    --- End diff --
    
    Please squash this change with commit that introduced this unused method


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202282838
  
    --- Diff: flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactoryTest.java ---
    @@ -21,8 +21,12 @@
     import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_08;
     
     /**
    - * Tests for {@link Kafka08JsonTableSourceFactory}.
    + * Tests for legacy Kafka08JsonTableSourceFactory.
    + *
    + * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
    + *             drop support for format-specific table sources.
      */
    +@Deprecated
    --- End diff --
    
    Does this modification belong to this commit?


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202506494
  
    --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java ---
    @@ -51,16 +51,10 @@ public void addProperties(DescriptorProperties properties) {
     	}
     
     	public Source toSource() {
    -		final Map<String, String> newProperties = new HashMap<>(properties);
    -		newProperties.replace(TableDescriptorValidator.TABLE_TYPE(),
    -				TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE());
    --- End diff --
    
    The table type is a concept of the SQL Client and should not be part of the table descriptor.


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202334214
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala ---
    @@ -18,143 +18,358 @@
     
     package org.apache.flink.table.factories
     
    -import java.util.{ServiceConfigurationError, ServiceLoader}
    +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
     
     import org.apache.flink.table.api._
     import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
     import org.apache.flink.table.descriptors.FormatDescriptorValidator._
     import org.apache.flink.table.descriptors.MetadataValidator._
     import org.apache.flink.table.descriptors.StatisticsValidator._
    -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator}
    +import org.apache.flink.table.descriptors._
     import org.apache.flink.table.util.Logging
    +import org.apache.flink.util.Preconditions
     
     import _root_.scala.collection.JavaConverters._
     import _root_.scala.collection.mutable
     
     /**
    -  * Unified interface to search for TableFactoryDiscoverable of provided type and properties.
    +  * Unified interface to search for a [[TableFactory]] of provided type and properties.
       */
     object TableFactoryService extends Logging {
     
       private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory])
     
    -  def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
    -    find(clz, descriptor, null)
    +  /**
    +    * Finds a table factory of the given class and descriptor.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param descriptor descriptor describing the factory configuration
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
    +    Preconditions.checkNotNull(factoryClass)
    --- End diff --
    
    if you are not touching variables but only passing them over somewhere, you can safely skip `checkNotNull`. Handy if some variable is being passed multiple times (like here)


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202506766
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala ---
    @@ -18,143 +18,358 @@
     
     package org.apache.flink.table.factories
     
    -import java.util.{ServiceConfigurationError, ServiceLoader}
    +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
     
     import org.apache.flink.table.api._
     import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
     import org.apache.flink.table.descriptors.FormatDescriptorValidator._
     import org.apache.flink.table.descriptors.MetadataValidator._
     import org.apache.flink.table.descriptors.StatisticsValidator._
    -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator}
    +import org.apache.flink.table.descriptors._
     import org.apache.flink.table.util.Logging
    +import org.apache.flink.util.Preconditions
     
     import _root_.scala.collection.JavaConverters._
     import _root_.scala.collection.mutable
     
     /**
    -  * Unified interface to search for TableFactoryDiscoverable of provided type and properties.
    +  * Unified interface to search for a [[TableFactory]] of provided type and properties.
       */
     object TableFactoryService extends Logging {
     
       private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory])
     
    -  def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
    -    find(clz, descriptor, null)
    +  /**
    +    * Finds a table factory of the given class and descriptor.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param descriptor descriptor describing the factory configuration
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(descriptor)
    +
    +    val descriptorProperties = new DescriptorProperties()
    +    descriptor.addProperties(descriptorProperties)
    +    findInternal(factoryClass, descriptorProperties.asMap, None)
       }
     
    -  def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader)
    -  : TableFactory = {
    +  /**
    +    * Finds a table factory of the given class, descriptor, and classloader.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param descriptor descriptor describing the factory configuration
    +    * @param classLoader classloader for service loading
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(descriptor)
    +    Preconditions.checkNotNull(classLoader)
     
    -    val properties = new DescriptorProperties()
    -    descriptor.addProperties(properties)
    -    find(clz, properties.asMap.asScala.toMap, classLoader)
    +    val descriptorProperties = new DescriptorProperties()
    +    descriptor.addProperties(descriptorProperties)
    +    findInternal(factoryClass, descriptorProperties.asMap, None)
       }
     
    -  def find(clz: Class[_], properties: Map[String, String]): TableFactory = {
    -    find(clz: Class[_], properties, null)
    +  /**
    +    * Finds a table factory of the given class and property map.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param propertyMap properties that describe the factory configuration
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(propertyMap)
    +
    +    findInternal(factoryClass, propertyMap, None)
       }
     
    -  def find(clz: Class[_], properties: Map[String, String],
    -           classLoader: ClassLoader): TableFactory = {
    +  /**
    +    * Finds a table factory of the given class, property map, and classloader.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param propertyMap properties that describe the factory configuration
    +    * @param classLoader classloader for service loading
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](
    +      factoryClass: Class[T],
    +      propertyMap: JMap[String, String],
    +      classLoader: ClassLoader)
    +    : T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(propertyMap)
    +    Preconditions.checkNotNull(classLoader)
    +
    +    findInternal(factoryClass, propertyMap, Some(classLoader))
    --- End diff --
    
    I like to have it more explicit at the beginning of a function.


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202336263
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala ---
    @@ -18,143 +18,358 @@
     
     package org.apache.flink.table.factories
     
    -import java.util.{ServiceConfigurationError, ServiceLoader}
    +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
     
     import org.apache.flink.table.api._
     import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
     import org.apache.flink.table.descriptors.FormatDescriptorValidator._
     import org.apache.flink.table.descriptors.MetadataValidator._
     import org.apache.flink.table.descriptors.StatisticsValidator._
    -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator}
    +import org.apache.flink.table.descriptors._
     import org.apache.flink.table.util.Logging
    +import org.apache.flink.util.Preconditions
     
     import _root_.scala.collection.JavaConverters._
     import _root_.scala.collection.mutable
     
     /**
    -  * Unified interface to search for TableFactoryDiscoverable of provided type and properties.
    +  * Unified interface to search for a [[TableFactory]] of provided type and properties.
       */
     object TableFactoryService extends Logging {
     
       private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory])
     
    -  def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
    -    find(clz, descriptor, null)
    +  /**
    +    * Finds a table factory of the given class and descriptor.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param descriptor descriptor describing the factory configuration
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(descriptor)
    +
    +    val descriptorProperties = new DescriptorProperties()
    +    descriptor.addProperties(descriptorProperties)
    +    findInternal(factoryClass, descriptorProperties.asMap, None)
       }
     
    -  def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader)
    -  : TableFactory = {
    +  /**
    +    * Finds a table factory of the given class, descriptor, and classloader.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param descriptor descriptor describing the factory configuration
    +    * @param classLoader classloader for service loading
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(descriptor)
    +    Preconditions.checkNotNull(classLoader)
     
    -    val properties = new DescriptorProperties()
    -    descriptor.addProperties(properties)
    -    find(clz, properties.asMap.asScala.toMap, classLoader)
    +    val descriptorProperties = new DescriptorProperties()
    +    descriptor.addProperties(descriptorProperties)
    +    findInternal(factoryClass, descriptorProperties.asMap, None)
       }
     
    -  def find(clz: Class[_], properties: Map[String, String]): TableFactory = {
    -    find(clz: Class[_], properties, null)
    +  /**
    +    * Finds a table factory of the given class and property map.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param propertyMap properties that describe the factory configuration
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(propertyMap)
    +
    +    findInternal(factoryClass, propertyMap, None)
       }
     
    -  def find(clz: Class[_], properties: Map[String, String],
    -           classLoader: ClassLoader): TableFactory = {
    +  /**
    +    * Finds a table factory of the given class, property map, and classloader.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param propertyMap properties that describe the factory configuration
    +    * @param classLoader classloader for service loading
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](
    +      factoryClass: Class[T],
    +      propertyMap: JMap[String, String],
    +      classLoader: ClassLoader)
    +    : T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(propertyMap)
    +    Preconditions.checkNotNull(classLoader)
    +
    +    findInternal(factoryClass, propertyMap, Some(classLoader))
    +  }
    +
    +  /**
    +    * Finds a table factory of the given class, property map, and classloader.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param propertyMap properties that describe the factory configuration
    +    * @param classLoader classloader for service loading
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  private def findInternal[T](
    +      factoryClass: Class[T],
    +      propertyMap: JMap[String, String],
    +      classLoader: Option[ClassLoader])
    +    : T = {
    +
    +    val properties = propertyMap.asScala.toMap
    +
    +    // discover table factories
    +    val foundFactories = discoverFactories(classLoader)
     
    -    var matchingFactory: Option[(TableFactory, Seq[String])] = None
    +    // filter by factory class
    +    val classFactories = filterByFactoryClass(
    +      factoryClass,
    +      properties,
    +      foundFactories)
    +
    +    // find matching context
    +    val contextFactories = filterByContext(
    +      factoryClass,
    +      properties,
    +      foundFactories,
    +      classFactories)
    +
    +    // filter by supported keys
    +    filterBySupportedProperties(
    +      factoryClass,
    +      properties,
    +      foundFactories,
    +      contextFactories)
    +  }
    +
    +  /**
    +    * Searches for factories using Java service providers.
    +    *
    +    * @return all factories in the classpath
    +    */
    +  private def discoverFactories[T](classLoader: Option[ClassLoader]): Seq[TableFactory] = {
    +    val foundFactories = mutable.ArrayBuffer[TableFactory]()
         try {
    -      val iter = if (classLoader == null) {
    -        defaultLoader.iterator()
    -      } else {
    -        val customLoader = ServiceLoader.load(classOf[TableFactory], classLoader)
    -        customLoader.iterator()
    +      val iterator = classLoader match {
    +        case Some(customClassLoader) =>
    +          val customLoader = ServiceLoader.load(classOf[TableFactory], customClassLoader)
    +          customLoader.iterator()
    +        case None =>
    +          defaultLoader.iterator()
           }
    -      while (iter.hasNext) {
    -        val factory = iter.next()
    -
    -        if (clz.isAssignableFrom(factory.getClass)) {
    -          val requiredContextJava = try {
    -            factory.requiredContext()
    -          } catch {
    -            case t: Throwable =>
    -              throw new TableException(
    -                s"Table source factory '${factory.getClass.getCanonicalName}' caused an exception.",
    -                t)
    -          }
    -
    -          val requiredContext = if (requiredContextJava != null) {
    -            // normalize properties
    -            requiredContextJava.asScala.map(e => (e._1.toLowerCase, e._2))
    -          } else {
    -            Map[String, String]()
    -          }
    -
    -          val plainContext = mutable.Map[String, String]()
    -          plainContext ++= requiredContext
    -          // we remove the versions for now until we have the first backwards compatibility case
    -          // with the version we can provide mappings in case the format changes
    -          plainContext.remove(CONNECTOR_PROPERTY_VERSION)
    -          plainContext.remove(FORMAT_PROPERTY_VERSION)
    -          plainContext.remove(METADATA_PROPERTY_VERSION)
    -          plainContext.remove(STATISTICS_PROPERTY_VERSION)
    -
    -          if (plainContext.forall(e => properties.contains(e._1) && properties(e._1) == e._2)) {
    -            matchingFactory match {
    -              case Some(_) => throw new AmbiguousTableFactoryException(properties)
    -              case None => matchingFactory =
    -                Some((factory.asInstanceOf[TableFactory], requiredContext.keys.toSeq))
    -            }
    -          }
    -        }
    +
    +      while (iterator.hasNext) {
    +        val factory = iterator.next()
    +        foundFactories += factory
           }
    +
    +      foundFactories
         } catch {
           case e: ServiceConfigurationError =>
             LOG.error("Could not load service provider for table factories.", e)
             throw new TableException("Could not load service provider for table factories.", e)
         }
    +  }
    +
    +  /**
    +    * Filters for factories with matching context.
    +    *
    +    * @return all matching factories
    +    */
    +  private def filterByContext[T](
    +      factoryClass: Class[T],
    +      properties: Map[String, String],
    +      foundFactories: Seq[TableFactory],
    +      classFactories: Seq[TableFactory])
    +    : Seq[TableFactory] = {
    +
    +    val matchingFactories = mutable.ArrayBuffer[TableFactory]()
    +
    +    classFactories.foreach { factory =>
    +      val requestedContext = normalizeContext(factory)
    +
    +      val plainContext = mutable.Map[String, String]()
    +      plainContext ++= requestedContext
    +      // we remove the version for now until we have the first backwards compatibility case
    +      // with the version we can provide mappings in case the format changes
    --- End diff --
    
    What if user provides custom connector or custom format? Is it documented that we do not support versioning now?


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202269785
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java ---
    @@ -54,51 +56,105 @@
      * override {@link #createKafkaConsumer(String, Properties, DeserializationSchema)}}.
      */
     @Internal
    -public abstract class KafkaTableSource
    -	implements StreamTableSource<Row>, DefinedProctimeAttribute, DefinedRowtimeAttributes {
    +public abstract class KafkaTableSource implements
    +		StreamTableSource<Row>,
    +		DefinedProctimeAttribute,
    +		DefinedRowtimeAttributes,
    +		DefinedFieldMapping {
    +
    +	// common table source attributes
    +	// TODO make all attributes final once we drop support for format-specific table sources
     
     	/** The schema of the table. */
     	private final TableSchema schema;
     
    +	/** Field name of the processing time attribute, null if no processing time field is defined. */
    +	private String proctimeAttribute;
    +
    +	/** Descriptor for a rowtime attribute. */
    +	private List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors;
    +
    +	/** Mapping for the fields of the table schema to fields of the physical returned type or null. */
    +	private Map<String, String> fieldMapping;
    +
    +	// Kafka-specific attributes
    +
     	/** The Kafka topic to consume. */
     	private final String topic;
     
     	/** Properties for the Kafka consumer. */
     	private final Properties properties;
     
    -	/** Type information describing the result type. */
    -	private TypeInformation<Row> returnType;
    -
    -	/** Field name of the processing time attribute, null if no processing time field is defined. */
    -	private String proctimeAttribute;
    -
    -	/** Descriptor for a rowtime attribute. */
    -	private List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors;
    +	/** Deserialization schema for decoding records from Kafka. */
    +	private final DeserializationSchema<Row> deserializationSchema;
     
     	/** The startup mode for the contained consumer (default is {@link StartupMode#GROUP_OFFSETS}). */
    -	private StartupMode startupMode = StartupMode.GROUP_OFFSETS;
    +	private StartupMode startupMode;
     
     	/** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS}. */
     	private Map<KafkaTopicPartition, Long> specificStartupOffsets;
     
     	/**
     	 * Creates a generic Kafka {@link StreamTableSource}.
     	 *
    -	 * @param topic                 Kafka topic to consume.
    -	 * @param properties            Properties for the Kafka consumer.
    -	 * @param schema                Schema of the produced table.
    -	 * @param returnType            Type information of the produced physical DataStream.
    +	 * @param schema                      Schema of the produced table.
    +	 * @param proctimeAttribute           Field name of the processing time attribute, null if no
    +	 *                                    processing time field is defined.
    +	 * @param rowtimeAttributeDescriptors Descriptor for a rowtime attribute
    +	 * @param fieldMapping                Mapping for the fields of the table schema to
    --- End diff --
    
    How can this field ever be null? `SchemaValidator.deriveFieldMapping` doesn't allow for that.


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202289678
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala ---
    @@ -44,32 +43,27 @@ object ExternalTableSourceUtil extends Logging {
         val properties = new DescriptorProperties()
         externalCatalogTable.addProperties(properties)
         val javaMap = properties.asMap
    -    val source = TableFactoryService.find(classOf[TableSourceFactory[_]], javaMap)
    -      .asInstanceOf[TableSourceFactory[_]]
    -      .createTableSource(javaMap)
         tableEnv match {
           // check for a batch table source in this batch environment
           case _: BatchTableEnvironment =>
    -        source match {
    -          case bts: BatchTableSource[_] =>
    -            new TableSourceSinkTable(Some(new BatchTableSourceTable(
    -              bts,
    -              new FlinkStatistic(externalCatalogTable.getTableStats))), None)
    -          case _ => throw new TableException(
    -            s"Found table source '${source.getClass.getCanonicalName}' is not applicable " +
    -              s"in a batch environment.")
    -        }
    +        val source = TableFactoryService
    --- End diff --
    
    1.
    This case looks kind of ugly. Shouldn't it be unify to sth like:
    ```
    TableFactoryService.find(classOf[TableSourceFactory], javaMap)
      .createTableSource(javaMap, environment.isInstanceOf[StreamTableEnvironment])
    ```
    ?
    If you really want, you can on top of that define methods:
    ```
    def createBatchTableSource(val javaMap) = createTableSource(javaMap, isStream = false)
    
    def createStreamTableSource(val javaMap) = createTableSource(javaMap, isStream = true)
    ```
    but I would skip them.
    
    Factories could choose to support or not streaming/batching tables. Same applies to similar code in `ExecutionContext.java`.
    
    2. Ditto: Is there any significant value of keeping both `BatchTableSink(Source)Factory` and `StreamTableSink(Source)Factory`? 
    
    
    both 1. and 2 adds quite a lot of boilerplate code for definition and for caller.


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202334605
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala ---
    @@ -18,143 +18,358 @@
     
     package org.apache.flink.table.factories
     
    -import java.util.{ServiceConfigurationError, ServiceLoader}
    +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
     
     import org.apache.flink.table.api._
     import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
     import org.apache.flink.table.descriptors.FormatDescriptorValidator._
     import org.apache.flink.table.descriptors.MetadataValidator._
     import org.apache.flink.table.descriptors.StatisticsValidator._
    -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator}
    +import org.apache.flink.table.descriptors._
     import org.apache.flink.table.util.Logging
    +import org.apache.flink.util.Preconditions
     
     import _root_.scala.collection.JavaConverters._
     import _root_.scala.collection.mutable
     
     /**
    -  * Unified interface to search for TableFactoryDiscoverable of provided type and properties.
    +  * Unified interface to search for a [[TableFactory]] of provided type and properties.
       */
     object TableFactoryService extends Logging {
     
       private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory])
     
    -  def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
    -    find(clz, descriptor, null)
    +  /**
    +    * Finds a table factory of the given class and descriptor.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param descriptor descriptor describing the factory configuration
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(descriptor)
    +
    +    val descriptorProperties = new DescriptorProperties()
    +    descriptor.addProperties(descriptorProperties)
    +    findInternal(factoryClass, descriptorProperties.asMap, None)
       }
     
    -  def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader)
    -  : TableFactory = {
    +  /**
    +    * Finds a table factory of the given class, descriptor, and classloader.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param descriptor descriptor describing the factory configuration
    +    * @param classLoader classloader for service loading
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(descriptor)
    +    Preconditions.checkNotNull(classLoader)
     
    -    val properties = new DescriptorProperties()
    -    descriptor.addProperties(properties)
    -    find(clz, properties.asMap.asScala.toMap, classLoader)
    +    val descriptorProperties = new DescriptorProperties()
    +    descriptor.addProperties(descriptorProperties)
    +    findInternal(factoryClass, descriptorProperties.asMap, None)
       }
     
    -  def find(clz: Class[_], properties: Map[String, String]): TableFactory = {
    -    find(clz: Class[_], properties, null)
    +  /**
    +    * Finds a table factory of the given class and property map.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param propertyMap properties that describe the factory configuration
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(propertyMap)
    +
    +    findInternal(factoryClass, propertyMap, None)
       }
     
    -  def find(clz: Class[_], properties: Map[String, String],
    -           classLoader: ClassLoader): TableFactory = {
    +  /**
    +    * Finds a table factory of the given class, property map, and classloader.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param propertyMap properties that describe the factory configuration
    +    * @param classLoader classloader for service loading
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](
    +      factoryClass: Class[T],
    +      propertyMap: JMap[String, String],
    +      classLoader: ClassLoader)
    +    : T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(propertyMap)
    +    Preconditions.checkNotNull(classLoader)
    +
    +    findInternal(factoryClass, propertyMap, Some(classLoader))
    --- End diff --
    
    Another hint sometimes it's handy to inline `checkNotNull` call:
    ```
    findInternal(factoryClass, propertyMap, Some(checkNotNull(classLoader)))
    ```


---

[GitHub] flink issue #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified table ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/6323
  
    Thank @pnowojski. Merging this...


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202506193
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactory.scala ---
    @@ -16,14 +16,14 @@
      * limitations under the License.
      */
     
    -package org.apache.flink.table.connectors
    +package org.apache.flink.table.factories
     
     import java.util
     
     /**
       * Common trait for all properties-based discoverable table factories.
       */
    -trait DiscoverableTableFactory {
    +trait TableFactory {
    --- End diff --
    
    Actually, I would like the very generic name `Factory` but since we have to add some prefix to make it unique in the project, I named it `TableFactory` because we prefix everything in this module with `Table`.


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202506661
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala ---
    @@ -143,118 +143,82 @@ case class CatalogAlreadyExistException(
     }
     
     /**
    -  * Exception for not finding a [[TableFormatFactory]] for the
    -  * given properties.
    +  * Exception for not finding a [[TableFactory]] for the given properties.
       *
       * @param message message that indicates the current matching step
       * @param factoryClass required factory class
    -  * @param formatFactories all found factories
    -  * @param properties properties that describe the table format
    +  * @param factories all found factories
    +  * @param properties properties that describe the configuration
       * @param cause the cause
       */
    -case class NoMatchingTableFormatException(
    +case class NoMatchingTableFactoryException(
           message: String,
           factoryClass: Class[_],
    -      formatFactories: Seq[TableFormatFactory[_]],
    +      factories: Seq[TableFactory],
           properties: Map[String, String],
           cause: Throwable)
         extends RuntimeException(
    --- End diff --
    
    So far we don't have an inheritance of exceptions. Case classes don't support that in Scala so we would need to convert them. 


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202535180
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala ---
    @@ -18,143 +18,358 @@
     
     package org.apache.flink.table.factories
     
    -import java.util.{ServiceConfigurationError, ServiceLoader}
    +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
     
     import org.apache.flink.table.api._
     import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
     import org.apache.flink.table.descriptors.FormatDescriptorValidator._
     import org.apache.flink.table.descriptors.MetadataValidator._
     import org.apache.flink.table.descriptors.StatisticsValidator._
    -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator}
    +import org.apache.flink.table.descriptors._
     import org.apache.flink.table.util.Logging
    +import org.apache.flink.util.Preconditions
     
     import _root_.scala.collection.JavaConverters._
     import _root_.scala.collection.mutable
     
     /**
    -  * Unified interface to search for TableFactoryDiscoverable of provided type and properties.
    +  * Unified interface to search for a [[TableFactory]] of provided type and properties.
       */
     object TableFactoryService extends Logging {
     
       private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory])
     
    -  def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
    -    find(clz, descriptor, null)
    +  /**
    +    * Finds a table factory of the given class and descriptor.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param descriptor descriptor describing the factory configuration
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(descriptor)
    +
    +    val descriptorProperties = new DescriptorProperties()
    +    descriptor.addProperties(descriptorProperties)
    +    findInternal(factoryClass, descriptorProperties.asMap, None)
       }
     
    -  def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader)
    -  : TableFactory = {
    +  /**
    +    * Finds a table factory of the given class, descriptor, and classloader.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param descriptor descriptor describing the factory configuration
    +    * @param classLoader classloader for service loading
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(descriptor)
    +    Preconditions.checkNotNull(classLoader)
     
    -    val properties = new DescriptorProperties()
    -    descriptor.addProperties(properties)
    -    find(clz, properties.asMap.asScala.toMap, classLoader)
    +    val descriptorProperties = new DescriptorProperties()
    +    descriptor.addProperties(descriptorProperties)
    +    findInternal(factoryClass, descriptorProperties.asMap, None)
       }
     
    -  def find(clz: Class[_], properties: Map[String, String]): TableFactory = {
    -    find(clz: Class[_], properties, null)
    +  /**
    +    * Finds a table factory of the given class and property map.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param propertyMap properties that describe the factory configuration
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(propertyMap)
    +
    +    findInternal(factoryClass, propertyMap, None)
       }
     
    -  def find(clz: Class[_], properties: Map[String, String],
    -           classLoader: ClassLoader): TableFactory = {
    +  /**
    +    * Finds a table factory of the given class, property map, and classloader.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param propertyMap properties that describe the factory configuration
    +    * @param classLoader classloader for service loading
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](
    +      factoryClass: Class[T],
    +      propertyMap: JMap[String, String],
    +      classLoader: ClassLoader)
    +    : T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(propertyMap)
    +    Preconditions.checkNotNull(classLoader)
    +
    +    findInternal(factoryClass, propertyMap, Some(classLoader))
    +  }
    +
    +  /**
    +    * Finds a table factory of the given class, property map, and classloader.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param propertyMap properties that describe the factory configuration
    +    * @param classLoader classloader for service loading
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  private def findInternal[T](
    +      factoryClass: Class[T],
    +      propertyMap: JMap[String, String],
    +      classLoader: Option[ClassLoader])
    +    : T = {
    +
    +    val properties = propertyMap.asScala.toMap
    +
    +    // discover table factories
    +    val foundFactories = discoverFactories(classLoader)
     
    -    var matchingFactory: Option[(TableFactory, Seq[String])] = None
    +    // filter by factory class
    +    val classFactories = filterByFactoryClass(
    +      factoryClass,
    +      properties,
    +      foundFactories)
    +
    +    // find matching context
    +    val contextFactories = filterByContext(
    +      factoryClass,
    +      properties,
    +      foundFactories,
    +      classFactories)
    +
    +    // filter by supported keys
    +    filterBySupportedProperties(
    +      factoryClass,
    +      properties,
    +      foundFactories,
    +      contextFactories)
    +  }
    +
    +  /**
    +    * Searches for factories using Java service providers.
    +    *
    +    * @return all factories in the classpath
    +    */
    +  private def discoverFactories[T](classLoader: Option[ClassLoader]): Seq[TableFactory] = {
    +    val foundFactories = mutable.ArrayBuffer[TableFactory]()
         try {
    -      val iter = if (classLoader == null) {
    -        defaultLoader.iterator()
    -      } else {
    -        val customLoader = ServiceLoader.load(classOf[TableFactory], classLoader)
    -        customLoader.iterator()
    +      val iterator = classLoader match {
    +        case Some(customClassLoader) =>
    +          val customLoader = ServiceLoader.load(classOf[TableFactory], customClassLoader)
    +          customLoader.iterator()
    +        case None =>
    +          defaultLoader.iterator()
           }
    -      while (iter.hasNext) {
    -        val factory = iter.next()
    -
    -        if (clz.isAssignableFrom(factory.getClass)) {
    -          val requiredContextJava = try {
    -            factory.requiredContext()
    -          } catch {
    -            case t: Throwable =>
    -              throw new TableException(
    -                s"Table source factory '${factory.getClass.getCanonicalName}' caused an exception.",
    -                t)
    -          }
    -
    -          val requiredContext = if (requiredContextJava != null) {
    -            // normalize properties
    -            requiredContextJava.asScala.map(e => (e._1.toLowerCase, e._2))
    -          } else {
    -            Map[String, String]()
    -          }
    -
    -          val plainContext = mutable.Map[String, String]()
    -          plainContext ++= requiredContext
    -          // we remove the versions for now until we have the first backwards compatibility case
    -          // with the version we can provide mappings in case the format changes
    -          plainContext.remove(CONNECTOR_PROPERTY_VERSION)
    -          plainContext.remove(FORMAT_PROPERTY_VERSION)
    -          plainContext.remove(METADATA_PROPERTY_VERSION)
    -          plainContext.remove(STATISTICS_PROPERTY_VERSION)
    -
    -          if (plainContext.forall(e => properties.contains(e._1) && properties(e._1) == e._2)) {
    -            matchingFactory match {
    -              case Some(_) => throw new AmbiguousTableFactoryException(properties)
    -              case None => matchingFactory =
    -                Some((factory.asInstanceOf[TableFactory], requiredContext.keys.toSeq))
    -            }
    -          }
    -        }
    +
    +      while (iterator.hasNext) {
    +        val factory = iterator.next()
    +        foundFactories += factory
           }
    +
    +      foundFactories
         } catch {
           case e: ServiceConfigurationError =>
             LOG.error("Could not load service provider for table factories.", e)
             throw new TableException("Could not load service provider for table factories.", e)
         }
    +  }
    +
    +  /**
    +    * Filters for factories with matching context.
    +    *
    +    * @return all matching factories
    +    */
    +  private def filterByContext[T](
    +      factoryClass: Class[T],
    +      properties: Map[String, String],
    +      foundFactories: Seq[TableFactory],
    +      classFactories: Seq[TableFactory])
    +    : Seq[TableFactory] = {
    +
    +    val matchingFactories = mutable.ArrayBuffer[TableFactory]()
    +
    +    classFactories.foreach { factory =>
    +      val requestedContext = normalizeContext(factory)
    +
    +      val plainContext = mutable.Map[String, String]()
    +      plainContext ++= requestedContext
    +      // we remove the version for now until we have the first backwards compatibility case
    +      // with the version we can provide mappings in case the format changes
    --- End diff --
    
    I opened FLINK-9851 for that.


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202520986
  
    --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java ---
    @@ -51,16 +51,10 @@ public void addProperties(DescriptorProperties properties) {
     	}
     
     	public Source toSource() {
    -		final Map<String, String> newProperties = new HashMap<>(properties);
    -		newProperties.replace(TableDescriptorValidator.TABLE_TYPE(),
    -				TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE());
    --- End diff --
    
    I'm just asking because I can not find what code has replaced them. Or there were some dead code?


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202350170
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala ---
    @@ -103,24 +103,40 @@ abstract class BatchTableEnvironment(
         : Unit = {
     
         tableSource match {
    +
    +      // check for proper batch table source
    --- End diff --
    
    ditto, those changes do seems to not belong to this commit. If so please extract them to separate one:
    
    ```
    git reset --soft HEAD^
    git reset HEAD flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
    git commit -m "Clean up and simplify changes"
    git commit -a --fixup=42a8a156d4e6f8f3d119c458350b6c897306fc48 # MD5 of Shuyi Chen's commit
    git rebase -i origin/master --autosquash
    ```


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202269870
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java ---
    @@ -134,34 +190,60 @@ public String getProctimeAttribute() {
     		return rowtimeAttributeDescriptors;
     	}
     
    +	@Override
    +	public Map<String, String> getFieldMapping() {
    +		return fieldMapping;
    +	}
    +
     	@Override
     	public String explainSource() {
     		return TableConnectorUtil.generateRuntimeName(this.getClass(), schema.getColumnNames());
     	}
     
    +	/**
    +	 * Returns the properties for the Kafka consumer.
    +	 *
    +	 * @return properties for the Kafka consumer.
    +	 */
    +	public Properties getProperties() {
    +		return properties;
    +	}
    +
    +	/**
    +	 * Returns the deserialization schema.
    +	 *
    +	 * @return The deserialization schema
    +	 */
    +	public DeserializationSchema<Row> getDeserializationSchema(){
    +		return deserializationSchema;
    +	}
    +
     	@Override
     	public boolean equals(Object o) {
     		if (this == o) {
     			return true;
     		}
    -		if (!(o instanceof KafkaTableSource)) {
    +		// TODO force classes to be equal once we drop support for format-specific table sources
    +		// if (o == null || getClass() != o.getClass()) {
    +		if (o == null || !(o instanceof KafkaTableSource)) {
     			return false;
     		}
    -		KafkaTableSource that = (KafkaTableSource) o;
    +		final KafkaTableSource that = (KafkaTableSource) o;
     		return Objects.equals(schema, that.schema) &&
    -			Objects.equals(topic, that.topic) &&
    -			Objects.equals(properties, that.properties) &&
    -			Objects.equals(returnType, that.returnType) &&
     			Objects.equals(proctimeAttribute, that.proctimeAttribute) &&
     			Objects.equals(rowtimeAttributeDescriptors, that.rowtimeAttributeDescriptors) &&
    +			Objects.equals(fieldMapping, that.fieldMapping) &&
    +			Objects.equals(topic, that.topic) &&
    +			Objects.equals(properties, that.properties) &&
    +			Objects.equals(deserializationSchema, that.deserializationSchema) &&
     			startupMode == that.startupMode &&
     			Objects.equals(specificStartupOffsets, that.specificStartupOffsets);
     	}
     
     	@Override
     	public int hashCode() {
    -		return Objects.hash(schema, topic, properties, returnType,
    -			proctimeAttribute, rowtimeAttributeDescriptors, startupMode, specificStartupOffsets);
    +		return Objects.hash(schema, proctimeAttribute, rowtimeAttributeDescriptors, fieldMapping,
    --- End diff --
    
    format one entry per line


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202335703
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala ---
    @@ -18,143 +18,358 @@
     
     package org.apache.flink.table.factories
     
    -import java.util.{ServiceConfigurationError, ServiceLoader}
    +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap}
     
     import org.apache.flink.table.api._
     import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
     import org.apache.flink.table.descriptors.FormatDescriptorValidator._
     import org.apache.flink.table.descriptors.MetadataValidator._
     import org.apache.flink.table.descriptors.StatisticsValidator._
    -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator}
    +import org.apache.flink.table.descriptors._
     import org.apache.flink.table.util.Logging
    +import org.apache.flink.util.Preconditions
     
     import _root_.scala.collection.JavaConverters._
     import _root_.scala.collection.mutable
     
     /**
    -  * Unified interface to search for TableFactoryDiscoverable of provided type and properties.
    +  * Unified interface to search for a [[TableFactory]] of provided type and properties.
       */
     object TableFactoryService extends Logging {
     
       private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory])
     
    -  def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = {
    -    find(clz, descriptor, null)
    +  /**
    +    * Finds a table factory of the given class and descriptor.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param descriptor descriptor describing the factory configuration
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(descriptor)
    +
    +    val descriptorProperties = new DescriptorProperties()
    +    descriptor.addProperties(descriptorProperties)
    +    findInternal(factoryClass, descriptorProperties.asMap, None)
       }
     
    -  def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader)
    -  : TableFactory = {
    +  /**
    +    * Finds a table factory of the given class, descriptor, and classloader.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param descriptor descriptor describing the factory configuration
    +    * @param classLoader classloader for service loading
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(descriptor)
    +    Preconditions.checkNotNull(classLoader)
     
    -    val properties = new DescriptorProperties()
    -    descriptor.addProperties(properties)
    -    find(clz, properties.asMap.asScala.toMap, classLoader)
    +    val descriptorProperties = new DescriptorProperties()
    +    descriptor.addProperties(descriptorProperties)
    +    findInternal(factoryClass, descriptorProperties.asMap, None)
       }
     
    -  def find(clz: Class[_], properties: Map[String, String]): TableFactory = {
    -    find(clz: Class[_], properties, null)
    +  /**
    +    * Finds a table factory of the given class and property map.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param propertyMap properties that describe the factory configuration
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(propertyMap)
    +
    +    findInternal(factoryClass, propertyMap, None)
       }
     
    -  def find(clz: Class[_], properties: Map[String, String],
    -           classLoader: ClassLoader): TableFactory = {
    +  /**
    +    * Finds a table factory of the given class, property map, and classloader.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param propertyMap properties that describe the factory configuration
    +    * @param classLoader classloader for service loading
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  def find[T](
    +      factoryClass: Class[T],
    +      propertyMap: JMap[String, String],
    +      classLoader: ClassLoader)
    +    : T = {
    +    Preconditions.checkNotNull(factoryClass)
    +    Preconditions.checkNotNull(propertyMap)
    +    Preconditions.checkNotNull(classLoader)
    +
    +    findInternal(factoryClass, propertyMap, Some(classLoader))
    +  }
    +
    +  /**
    +    * Finds a table factory of the given class, property map, and classloader.
    +    *
    +    * @param factoryClass desired factory class
    +    * @param propertyMap properties that describe the factory configuration
    +    * @param classLoader classloader for service loading
    +    * @tparam T factory class type
    +    * @return the matching factory
    +    */
    +  private def findInternal[T](
    +      factoryClass: Class[T],
    +      propertyMap: JMap[String, String],
    +      classLoader: Option[ClassLoader])
    +    : T = {
    +
    +    val properties = propertyMap.asScala.toMap
    +
    +    // discover table factories
    +    val foundFactories = discoverFactories(classLoader)
     
    -    var matchingFactory: Option[(TableFactory, Seq[String])] = None
    +    // filter by factory class
    +    val classFactories = filterByFactoryClass(
    +      factoryClass,
    +      properties,
    +      foundFactories)
    +
    +    // find matching context
    +    val contextFactories = filterByContext(
    +      factoryClass,
    +      properties,
    +      foundFactories,
    +      classFactories)
    +
    +    // filter by supported keys
    +    filterBySupportedProperties(
    +      factoryClass,
    +      properties,
    +      foundFactories,
    +      contextFactories)
    +  }
    +
    +  /**
    +    * Searches for factories using Java service providers.
    +    *
    +    * @return all factories in the classpath
    +    */
    +  private def discoverFactories[T](classLoader: Option[ClassLoader]): Seq[TableFactory] = {
    +    val foundFactories = mutable.ArrayBuffer[TableFactory]()
         try {
    -      val iter = if (classLoader == null) {
    -        defaultLoader.iterator()
    -      } else {
    -        val customLoader = ServiceLoader.load(classOf[TableFactory], classLoader)
    -        customLoader.iterator()
    +      val iterator = classLoader match {
    +        case Some(customClassLoader) =>
    +          val customLoader = ServiceLoader.load(classOf[TableFactory], customClassLoader)
    +          customLoader.iterator()
    +        case None =>
    +          defaultLoader.iterator()
           }
    -      while (iter.hasNext) {
    -        val factory = iter.next()
    -
    -        if (clz.isAssignableFrom(factory.getClass)) {
    -          val requiredContextJava = try {
    -            factory.requiredContext()
    -          } catch {
    -            case t: Throwable =>
    -              throw new TableException(
    -                s"Table source factory '${factory.getClass.getCanonicalName}' caused an exception.",
    -                t)
    -          }
    -
    -          val requiredContext = if (requiredContextJava != null) {
    -            // normalize properties
    -            requiredContextJava.asScala.map(e => (e._1.toLowerCase, e._2))
    -          } else {
    -            Map[String, String]()
    -          }
    -
    -          val plainContext = mutable.Map[String, String]()
    -          plainContext ++= requiredContext
    -          // we remove the versions for now until we have the first backwards compatibility case
    -          // with the version we can provide mappings in case the format changes
    -          plainContext.remove(CONNECTOR_PROPERTY_VERSION)
    -          plainContext.remove(FORMAT_PROPERTY_VERSION)
    -          plainContext.remove(METADATA_PROPERTY_VERSION)
    -          plainContext.remove(STATISTICS_PROPERTY_VERSION)
    -
    -          if (plainContext.forall(e => properties.contains(e._1) && properties(e._1) == e._2)) {
    -            matchingFactory match {
    -              case Some(_) => throw new AmbiguousTableFactoryException(properties)
    -              case None => matchingFactory =
    -                Some((factory.asInstanceOf[TableFactory], requiredContext.keys.toSeq))
    -            }
    -          }
    -        }
    +
    +      while (iterator.hasNext) {
    +        val factory = iterator.next()
    +        foundFactories += factory
           }
    +
    +      foundFactories
         } catch {
           case e: ServiceConfigurationError =>
             LOG.error("Could not load service provider for table factories.", e)
             throw new TableException("Could not load service provider for table factories.", e)
         }
    +  }
    +
    +  /**
    +    * Filters for factories with matching context.
    +    *
    +    * @return all matching factories
    +    */
    +  private def filterByContext[T](
    --- End diff --
    
    nit: move this method below `filterByFactoryClass`? (to match the order of invocations) 


---

[GitHub] flink issue #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified table ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/6323
  
    Thank you @pnowojski. I hope I could address all your comments. I will clean the commit history and improve the commit messages during merging.


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202282221
  
    --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java ---
    @@ -214,15 +216,18 @@ private static TableDescriptor createTableDescriptor(String name, Map<String, Ob
     		if (typeObject == null || !(typeObject instanceof String)) {
     			throw new SqlClientException("Invalid 'type' attribute for table '" + name + "'.");
     		}
    -		final String type = (String) config.get(TABLE_TYPE);
    +		final String type = (String) typeObject;
    +		final Map<String, Object> properties = new HashMap<>(config);
     		config.remove(TABLE_TYPE);
    -		final Map<String, String> normalizedConfig = ConfigUtil.normalizeYaml(config);
    -		if (type.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE())) {
    -			return new Source(name, normalizedConfig);
    -		} else if (type.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SINK())) {
    -			return new Sink(name, normalizedConfig);
    -		} else if (type.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE_SINK())) {
    -			return new SourceSink(name, normalizedConfig);
    +
    +		final Map<String, String> normalizedProperties = ConfigUtil.normalizeYaml(properties);
    +		switch (type) {
    +			case TABLE_TYPE_VALUE_SOURCE:
    +				return new Source(name, normalizedProperties);
    +			case TABLE_TYPE_VALUE_SINK:
    +				return new Sink(name, normalizedProperties);
    +			case TABLE_TYPE_VALUE_BOTH:
    +				return new SourceSink(name, normalizedProperties);
    --- End diff --
    
    ```
    default:
      throw new UnsupportedOperationException(
        String.format("Unsupported [%s] value [%s]", TABLE_TYPE, type)
    ```


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202275530
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactory.scala ---
    @@ -21,7 +21,14 @@ package org.apache.flink.table.factories
     import java.util
     
     /**
    -  * Common trait for all properties-based discoverable table factories.
    +  * A factory to create different table-related instances from string-based properties. This
    +  * factory is used with Java's Service Provider Interfaces (SPI) for discovering. A factory is
    +  * called with a set of normalized properties that describe the desired format. The factory allows
    --- End diff --
    
    isn't this comment a bit misleading? 
    
    > that describe the desired format. 
    



---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202348312
  
    --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java ---
    @@ -63,65 +62,45 @@ public void run() {
     			LOG.debug("Submitting job {} with the following environment: \n{}",
     					jobGraph.getJobID(), context.getMergedEnvironment());
     		}
    -		if (result != null) {
    -			executionResultBucket.add(deployJob(context, jobGraph, result));
    -		} else {
    -			deployJob(context, jobGraph, result);
    -		}
    +		executionResultBucket.add(deployJob(context, jobGraph, result));
     	}
     
     	public JobExecutionResult fetchExecutionResult() {
    -		if (result != null) {
    -			return executionResultBucket.poll();
    -		} else {
    -			return null;
    -		}
    +		return executionResultBucket.poll();
     	}
     
     	/**
    -	 * Deploys a job. Depending on the deployment creates a new job cluster. If result is requested,
    -	 * it saves the cluster id in the result and blocks until job completion.
    +	 * Deploys a job. Depending on the deployment creates a new job cluster. It saves the cluster id in
    --- End diff --
    
    How do those changes in this file relate to rest of the PR/commit?


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202327344
  
    --- Diff: flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory ---
    @@ -13,5 +13,10 @@
     # See the License for the specific language governing permissions and
     # limitations under the License.
     
    +<<<<<<< HEAD:flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
    --- End diff --
    
    ?


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202356462
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala ---
    @@ -186,7 +186,7 @@ class TableEnvironmentITCase(
       def testInsertIntoMemoryTable(): Unit = {
         val env = ExecutionEnvironment.getExecutionEnvironment
         val tEnv = TableEnvironment.getTableEnvironment(env)
    -    MemoryTableSourceSinkUtil.clear
    +    MemoryTableSourceSinkUtil.clear()
    --- End diff --
    
    ditto, and same applies probably to hundreds of other changes in this commit :(


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202506512
  
    --- Diff: flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactoryTest.java ---
    @@ -21,8 +21,12 @@
     import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_08;
     
     /**
    - * Tests for {@link Kafka08JsonTableSourceFactory}.
    + * Tests for legacy Kafka08JsonTableSourceFactory.
    + *
    + * @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
    + *             drop support for format-specific table sources.
      */
    +@Deprecated
    --- End diff --
    
    No but we forgot it in the previous commit.


---

[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6323#discussion_r202506126
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java ---
    @@ -54,51 +56,105 @@
      * override {@link #createKafkaConsumer(String, Properties, DeserializationSchema)}}.
      */
     @Internal
    -public abstract class KafkaTableSource
    -	implements StreamTableSource<Row>, DefinedProctimeAttribute, DefinedRowtimeAttributes {
    +public abstract class KafkaTableSource implements
    +		StreamTableSource<Row>,
    +		DefinedProctimeAttribute,
    +		DefinedRowtimeAttributes,
    +		DefinedFieldMapping {
    +
    +	// common table source attributes
    +	// TODO make all attributes final once we drop support for format-specific table sources
     
     	/** The schema of the table. */
     	private final TableSchema schema;
     
    +	/** Field name of the processing time attribute, null if no processing time field is defined. */
    +	private String proctimeAttribute;
    +
    +	/** Descriptor for a rowtime attribute. */
    +	private List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors;
    +
    +	/** Mapping for the fields of the table schema to fields of the physical returned type or null. */
    +	private Map<String, String> fieldMapping;
    +
    +	// Kafka-specific attributes
    +
     	/** The Kafka topic to consume. */
     	private final String topic;
     
     	/** Properties for the Kafka consumer. */
     	private final Properties properties;
     
    -	/** Type information describing the result type. */
    -	private TypeInformation<Row> returnType;
    -
    -	/** Field name of the processing time attribute, null if no processing time field is defined. */
    -	private String proctimeAttribute;
    -
    -	/** Descriptor for a rowtime attribute. */
    -	private List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors;
    +	/** Deserialization schema for decoding records from Kafka. */
    +	private final DeserializationSchema<Row> deserializationSchema;
     
     	/** The startup mode for the contained consumer (default is {@link StartupMode#GROUP_OFFSETS}). */
    -	private StartupMode startupMode = StartupMode.GROUP_OFFSETS;
    +	private StartupMode startupMode;
     
     	/** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS}. */
     	private Map<KafkaTopicPartition, Long> specificStartupOffsets;
     
     	/**
     	 * Creates a generic Kafka {@link StreamTableSource}.
     	 *
    -	 * @param topic                 Kafka topic to consume.
    -	 * @param properties            Properties for the Kafka consumer.
    -	 * @param schema                Schema of the produced table.
    -	 * @param returnType            Type information of the produced physical DataStream.
    +	 * @param schema                      Schema of the produced table.
    +	 * @param proctimeAttribute           Field name of the processing time attribute, null if no
    +	 *                                    processing time field is defined.
    +	 * @param rowtimeAttributeDescriptors Descriptor for a rowtime attribute
    +	 * @param fieldMapping                Mapping for the fields of the table schema to
    --- End diff --
    
    Backward compatibility. It could have been null in the past.


---