You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by twalthr <gi...@git.apache.org> on 2018/01/04 14:54:13 UTC

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

GitHub user twalthr opened a pull request:

    https://github.com/apache/flink/pull/5240

    [FLINK-8240] [table] Create unified interfaces to configure and instatiate TableSources

    ## What is the purpose of the change
    
    This PR presents the inital version of the new unified TableSource API. The API is based on a descriptor approach. A descriptor allows for describing parameters and behavior. They contain no logic but only store information and translate it to normalized string-based properties.
    
    The following example shows how a CSV table source could be specified in the future:
    
    ```
    tableEnv
          .createTable(
            Schema()
              .field("myfield", Types.STRING)
              .field("myfield2", Types.INT))
          .withConnector(
            FileSystem()
              .path("/path/to/csv"))
          .withEncoding(
            CSV()
              .field("myfield", Types.STRING)
              .field("myfield2", Types.INT)
              .quoteCharacter(';')
              .fieldDelimiter("#")
              .lineDelimiter("\r\n")
              .commentPrefix("%%")
              .ignoreFirstLine()
              .ignoreParseErrors())
          .withRowtime(
            Rowtime()
              .field("rowtime")
              .timestampFromDataStream()
              .watermarkFromDataStream())
          .withProctime(
            Proctime()
              .field("myproctime"))
    ```
    
    They get translated into:
    
    ```
    "schema.0.name" -> "myfield",
    "schema.0.type" -> "VARCHAR",
    "schema.1.name" -> "myfield2",
    "schema.1.type" -> "INT",
    "connector.type" -> "filesystem",
    "connector.path" -> "/path/to/csv",
    "encoding.type" -> "csv",
    "encoding.fields.0.name" -> "myfield",
    "encoding.fields.0.type" -> "VARCHAR",
    "encoding.fields.1.name" -> "myfield2",
    "encoding.fields.1.type" -> "INT",
    "encoding.quote-character" -> ";",
    "encoding.field-delimiter" -> "#",
    "encoding.line-delimiter" -> "\r\n",
    "encoding.comment-prefix" -> "%%",
    "encoding.ignore-first-line" -> "true",
    "encoding.ignore-parse-errors" -> "true",
    "rowtime.0.name" -> "rowtime",
    "rowtime.0.timestamp.type" -> "stream-record",
    "rowtime.0.watermark.type" -> "preserving",
    "proctime" -> "myproctime"
    ```
    
    This PR also reworks the discovery of table sources by deprecating the `@TableType` annotation and reflection-based discovery with `TableSourceFactory` interfaces and standard Java Service Provider Interfaces (SPI). Now the table factories can use the above properties to create table sources from. The `ExternalCatalogTable` class has been reworked to use the new descriptor-based approach as well, however, we should be fully source code backwards compatible.
    
    I agree that there are more tests missing and we should also decide where and how the validation should happen. I think it should happen mostly in the table source builders. We could also introduce some global dictionary class to use constants for properties instead of strings at different positions.
    
    What do you think?
    
    ## Brief change log
    
      - Adds descriptors for schema, connectors, encoding, statistics, metadata, proctime, and rowtime
      - Adds table factory discovery based on unified properties
    
    ## Verifying this change
    
     - Added `DescriptorsTest`
     - ExternalCatalog tests are still working
     - More tests will follow...
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): no
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
      - The serializers: no
      - The runtime per-record code paths (performance sensitive): no
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
      - The S3 file system connector: no
    
    ## Documentation
    
      - Does this pull request introduce a new feature? yes
      - If yes, how is the feature documented? ScalaDocs

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/twalthr/flink FLINK-8240

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5240.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5240
    
----
commit a8ccac6895bde97b154c1cbb442a0ac6e901b4c3
Author: twalthr <tw...@...>
Date:   2017-12-15T09:18:20Z

    [FLINK-8240] [table] Create unified interfaces to configure and instatiate TableSources

----


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r163841505
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JSON.scala ---
    @@ -0,0 +1,140 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import java.util
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.table.api.TableSchema
    +
    +import scala.collection.mutable
    +import scala.collection.JavaConverters._
    +
    +/**
    +  * Encoding descriptor for JSON.
    +  */
    +class JSON extends EncodingDescriptor("json") {
    +
    +  private val encodingSchema: mutable.LinkedHashMap[String, String] =
    +      mutable.LinkedHashMap[String, String]()
    +  private var fieldMapping: Option[util.Map[String, String]] = None
    +  private var failOnMissingField: Option[Boolean] = None
    +
    +  /**
    +    * Sets the JSON schema with field names and the types for the JSON-encoded input.
    +    * The JSON schema must not contain nested fields.
    +    *
    +    * This method overwrites existing fields added with [[field()]].
    +    *
    +    * @param schema the table schema
    +    */
    +  def schema(schema: TableSchema): JSON = {
    +    this.encodingSchema.clear()
    +    NormalizedProperties.normalizeTableSchema(schema).foreach {
    +      case (n, t) => field(n, t)
    +    }
    +    this
    +  }
    +
    +  /**
    +    * Adds a JSON field with the field name and the type information for the JSON-encoding.
    +    * This method can be called multiple times. The call order of this method defines
    +    * also the order of the fields in the JSON-encoding.
    +    *
    +    * @param fieldName the field name
    +    * @param fieldType the type information of the field
    +    */
    +  def field(fieldName: String, fieldType: TypeInformation[_]): JSON = {
    +    field(fieldName, NormalizedProperties.normalizeTypeInfo(fieldType))
    +    this
    +  }
    +
    +  /**
    +    * Adds a JSON field with the field name and the type string for the JSON-encoding.
    +    * This method can be called multiple times. The call order of this method defines
    +    * also the order of the fields in the JSON-encoding.
    +    *
    +    * @param fieldName the field name
    +    * @param fieldType the type string of the field
    +    */
    +  def field(fieldName: String, fieldType: String): JSON = {
    +    if (encodingSchema.contains(fieldName)) {
    +      throw new IllegalArgumentException(s"Duplicate field name $fieldName.")
    +    }
    +    encodingSchema += (fieldName -> fieldType)
    +    this
    +  }
    +
    +  /**
    +    * Sets a mapping from schema fields to fields of the JSON schema.
    +    *
    +    * A field mapping is required if the fields of produced tables should be named different than
    +    * the fields of the JSON records.
    +    * The key of the provided Map refers to the field of the table schema,
    +    * the value to the field in the JSON schema.
    +    *
    +    * @param tableToJsonMapping A mapping from table schema fields to JSON schema fields.
    +    * @return The builder.
    +    */
    +  def tableToJsonMapping(tableToJsonMapping: util.Map[String, String]): JSON = {
    +    this.fieldMapping = Some(tableToJsonMapping)
    +    this
    +  }
    +
    +  /**
    +    * Sets flag whether to fail if a field is missing or not.
    +    *
    +    * @param failOnMissingField If set to true, the operation fails if there is a missing field.
    +    *                           If set to false, a missing field is set to null.
    +    * @return The builder.
    +    */
    +  def failOnMissingField(failOnMissingField: Boolean): JSON = {
    +    this.failOnMissingField = Some(failOnMissingField)
    +    this
    +  }
    +
    +  /**
    +    * Internal method for encoding properties conversion.
    +    */
    +  override protected def addEncodingProperties(properties: NormalizedProperties): Unit = {
    +    properties.putIndexedFixedProperties(
    --- End diff --
    
    The easiest solution for all string-based situations where nested fields need to be supported is to specify the fields flattend like:
    
    ```
    JSON()
      .field("object.person.name", "VARCHAR")
      .field("object.person.age", "INT")
    ```
    I will add an description to the docs of the methods.



---

[GitHub] flink issue #5240: [FLINK-8240] [table] Create unified interfaces to configu...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/5240
  
    Thanks for the review @fhueske. I will merge this now. We definitely need follow-up issues for this.


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r162964487
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala ---
    @@ -156,7 +195,10 @@ case class NoMatchedTableSourceConverterException(
       *
       * @param tableType table type
       * @param cause the cause
    +  * @deprecated Use table source factories instead.
       */
    +@Deprecated
    +@deprecated("Use table source factories instead.")
    --- End diff --
    
    Give a class name.


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r164151474
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala ---
    @@ -69,17 +72,76 @@ class Schema extends Descriptor {
         */
       def field(fieldName: String, fieldType: String): Schema = {
         if (tableSchema.contains(fieldName)) {
    -      throw new IllegalArgumentException(s"Duplicate field name $fieldName.")
    +      throw new ValidationException(s"Duplicate field name $fieldName.")
    +    }
    +
    +    val fieldProperties = mutable.LinkedHashMap[String, String]()
    +    fieldProperties += (DescriptorUtils.TYPE -> fieldType)
    +
    +    tableSchema += (fieldName -> fieldProperties)
    +
    +    lastField = Some(fieldName)
    +    this
    +  }
    +
    +  /**
    +    * Specifies the origin of the previously defined field. The origin field is defined by a
    +    * connector or format.
    +    *
    +    * E.g. field("myString", Types.STRING).from("CSV_MY_STRING")
    +    */
    +  def from(originFieldName: String): Schema = {
    +    lastField match {
    +      case None => throw new ValidationException("No field defined previously. Use field() before.")
    +      case Some(f) =>
    +        tableSchema(f) += (DescriptorUtils.FROM -> originFieldName)
    +        lastField = None
    +    }
    +    this
    +  }
    +
    +  /**
    +    * Specifies the previously defined field as a processing-time attribute.
    +    *
    +    * E.g. field("myString", Types.STRING).proctime()
    +    */
    +  def proctime(): Schema = {
    +    lastField match {
    +      case None => throw new ValidationException("No field defined previously. Use field() before.")
    +      case Some(f) =>
    +        tableSchema(f) += (DescriptorUtils.PROCTIME -> DescriptorUtils.TRUE)
    +        lastField = None
    +    }
    +    this
    +  }
    +
    +  /**
    +    * Specifies the previously defined field as an event-time attribute.
    +    *
    +    * E.g. field("myString", Types.STRING).rowtime(...)
    --- End diff --
    
    `field("procTime", Types.SQL_TIMESTAMP).proctime()`


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r163836415
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptor.scala ---
    @@ -0,0 +1,47 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import org.apache.flink.table.descriptors.DescriptorUtils.connector
    +
    +/**
    +  * Describes a connector to an other system.
    +  *
    +  * @param tpe string identifier for the connector
    +  */
    +abstract class ConnectorDescriptor(private val tpe: String) extends Descriptor {
    +
    +  /**
    +    * Internal method for properties conversion.
    +    */
    +  final def addProperties(properties: NormalizedProperties): Unit = {
    +    properties.putString(connector("type"), tpe)
    +    val connectorProperties = new NormalizedProperties()
    +    addConnectorProperties(connectorProperties)
    +    connectorProperties.getProperties.foreach { case (k, v) =>
    --- End diff --
    
    This logic ensures that you cannot add an encoding in a connector. All properties that are added are prefixed correctly with "connector.".


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r163830189
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala ---
    @@ -107,6 +111,16 @@ abstract class BatchTableEnvironment(
         }
       }
     
    +  /**
    +    * Creates a table from a descriptor that describes the resulting table schema, the source
    +    * connector, source encoding, and other properties.
    +    *
    +    * @param schema schema descriptor describing the table to create
    +    */
    +  def createTable(schema: Schema): BatchTableSourceDescriptor = {
    --- End diff --
    
    I will improve the definition but in general I think it is nice to have more fluent API without too much nesting. It is also inconvenient to call a method on table environment and pass it to its parameters.


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r162988639
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala ---
    @@ -23,22 +23,74 @@ import java.net.URL
     import org.apache.commons.configuration.{ConfigurationException, ConversionException, PropertiesConfiguration}
     import org.apache.flink.annotation.VisibleForTesting
     import org.apache.flink.table.annotation.TableType
    -import org.apache.flink.table.api.{AmbiguousTableSourceConverterException, NoMatchedTableSourceConverterException, TableException}
    +import org.apache.flink.table.api._
     import org.apache.flink.table.plan.schema.{BatchTableSourceTable, StreamTableSourceTable, TableSourceTable}
     import org.apache.flink.table.plan.stats.FlinkStatistic
    -import org.apache.flink.table.sources.{BatchTableSource, StreamTableSource, TableSource}
    +import org.apache.flink.table.sources.{BatchTableSource, StreamTableSource, TableSource, TableSourceFactoryService}
     import org.apache.flink.table.util.Logging
     import org.apache.flink.util.InstantiationUtil
     import org.reflections.Reflections
     
    -import scala.collection.JavaConverters._
    -import scala.collection.mutable
    +import _root_.scala.collection.JavaConverters._
    +import _root_.scala.collection.mutable
     
     /**
       * The utility class is used to convert ExternalCatalogTable to TableSourceTable.
       */
     object ExternalTableSourceUtil extends Logging {
     
    +  /**
    +    * Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance
    +    *
    +    * @param externalCatalogTable the [[ExternalCatalogTable]] instance which to convert
    +    * @return converted [[TableSourceTable]] instance from the input catalog table
    +    */
    +  def fromExternalCatalogTable(
    +      tableEnv: TableEnvironment,
    +      externalCatalogTable: ExternalCatalogTable)
    +    : TableSourceTable[_] = {
    +
    +    // check for the legacy external catalog path
    +    if (externalCatalogTable.isLegacyTableType) {
    +      LOG.warn("External catalog tables based on TableType annotations are deprecated. " +
    +        "Please consider updating them to TableSourceFactories.")
    +      fromExternalCatalogTableType(externalCatalogTable)
    +    }
    +    // use the factory approach
    +    else {
    +      val source = TableSourceFactoryService.findTableSourceFactory(externalCatalogTable)
    +      tableEnv match {
    +        // check for a batch table source in this batch environment
    +        case _: BatchTableEnvironment =>
    +          source match {
    +            case bts: BatchTableSource[_] =>
    +              new BatchTableSourceTable(
    +                bts,
    +                new FlinkStatistic(externalCatalogTable.getTableStats))
    +            case _ => throw new TableException(
    +              s"Found table source '${source.getClass.getCanonicalName}' is not applicable " +
    +                s"in a batch environment.")
    +          }
    +        // check for a stream table source in this streaming environment
    +        case _: StreamTableEnvironment =>
    +          source match {
    +            case sts: StreamTableSource[_] =>
    +              new StreamTableSourceTable(
    +                sts,
    +                new FlinkStatistic(externalCatalogTable.getTableStats))
    +            case _ => throw new TableException(
    +              s"Found table source '${source.getClass.getCanonicalName}' is not applicable " +
    +                s"in a streaming environment.")
    +          }
    +        case _ => throw new TableException("Unsupported table environment.")
    +      }
    +    }
    +  }
    +
    +  // ----------------------------------------------------------------------------------------------
    +  // NOTE: the following line can be removed once we drop support for TableType
    --- End diff --
    
    line or lines? 
    Create a JIRA and link it here as reference?


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r163012442
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala ---
    @@ -0,0 +1,95 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import org.apache.flink.table.api.{StreamTableEnvironment, Table, TableException}
    +import org.apache.flink.table.sources.{StreamTableSource, TableSource, TableSourceFactoryService}
    +
    +/**
    +  * Descriptor for specifying a table source in a streaming environment.
    +  */
    +class StreamTableSourceDescriptor(
    +    tableEnv: StreamTableEnvironment,
    +    schema: Schema)
    +  extends TableSourceDescriptor {
    +
    +  schemaDescriptor = Some(schema)
    +
    +  /**
    +    * Searches for the specified table source, configures it accordingly, and returns it.
    +    */
    +  def toTableSource: TableSource[_] = {
    +    val source = TableSourceFactoryService.findTableSourceFactory(this)
    +    source match {
    +      case _: StreamTableSource[_] => source
    +      case _ => throw new TableException(
    +        s"Found table source '${source.getClass.getCanonicalName}' is not applicable " +
    +          s"in a streaming environment.")
    +    }
    +  }
    +
    +  /**
    +    * Searches for the specified table source, configures it accordingly, and returns it as a table.
    +    */
    +  def toTable: Table = {
    +    tableEnv.fromTableSource(toTableSource)
    +  }
    +
    +  /**
    +    * Searches for the specified table source, configures it accordingly, and registers it as
    +    * a table under the given name.
    +    *
    +    * @param name table name to be registered in the table environment
    +    */
    +  def register(name: String): Unit = {
    +    tableEnv.registerTableSource(name, toTableSource)
    +  }
    +
    +  /**
    +    * Specifies an connector for reading data from a connector.
    +    */
    +  def withConnector(connector: ConnectorDescriptor): StreamTableSourceDescriptor = {
    +    connectorDescriptor = Some(connector)
    +    this
    +  }
    +
    +  /**
    +    * Specifies an encoding that defines how to read data from a connector.
    +    */
    +  def withEncoding(encoding: EncodingDescriptor): StreamTableSourceDescriptor = {
    +    encodingDescriptor = Some(encoding)
    --- End diff --
    
    check if the connector requires an encoding?


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r163011304
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala ---
    @@ -0,0 +1,32 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +/**
    +  * A class that adds a set of string-based, normalized properties for describing a
    +  * table source or table sink.
    +  */
    +abstract class Descriptor {
    --- End diff --
    
    Should we add a validation method that checks if the descriptor is valid?


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r164131675
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/CSV.scala ---
    @@ -139,26 +139,26 @@ class CSV extends EncodingDescriptor("csv") {
       }
     
       /**
    -    * Internal method for encoding properties conversion.
    +    * Internal method for format properties conversion.
         */
    -  override protected def addEncodingProperties(properties: NormalizedProperties): Unit = {
    -    fieldDelim.foreach(properties.putString("field-delimiter", _))
    -    lineDelim.foreach(properties.putString("line-delimiter", _))
    -    properties.putTableSchema("fields", encodingSchema.toIndexedSeq)
    -    quoteCharacter.foreach(properties.putCharacter("quote-character", _))
    -    commentPrefix.foreach(properties.putString("comment-prefix", _))
    -    isIgnoreFirstLine.foreach(properties.putBoolean("ignore-first-line", _))
    -    lenient.foreach(properties.putBoolean("ignore-parse-errors", _))
    +  override protected def addFormatProperties(properties: NormalizedProperties): Unit = {
    +    fieldDelim.foreach(properties.putString(DescriptorUtils.FIELD_DELIMITER, _))
    --- End diff --
    
    I would not define the constants globally. Some constants should be global, but constants for specific connectors or formats, should go to the respective descriptor.
    IMO, it would be better to have these keys in `CSV` or the class that validates the properties of a certain type.


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r164168331
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala ---
    @@ -0,0 +1,131 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import org.apache.flink.table.api.Types
    +import org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor, normalizeWatermarkStrategy}
    +import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp, TimestampExtractor}
    +import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
    +
    +import scala.collection.mutable
    +
    +/**
    +  * Rowtime descriptor for describing an event time attribute in the schema.
    +  */
    +class Rowtime extends Descriptor {
    +
    +  private var timestampExtractor: Option[TimestampExtractor] = None
    +  private var watermarkStrategy: Option[WatermarkStrategy] = None
    +
    +  /**
    +    * Sets a built-in timestamp extractor that converts an existing [[Long]] or
    +    * [[Types.SQL_TIMESTAMP]] field into the rowtime attribute.
    +    *
    +    * @param fieldName The field to convert into a rowtime attribute.
    +    */
    +  def timestampFromField(fieldName: String): Rowtime = {
    +    timestampExtractor = Some(new ExistingField(fieldName))
    +    this
    +  }
    +
    +  /**
    +    * Sets a built-in timestamp extractor that converts the assigned timestamp from
    +    * a DataStream API record into the rowtime attribute.
    +    *
    +    * Note: This extractor only works in streaming environments.
    +    */
    +  def timestampFromDataStream(): Rowtime = {
    +    timestampExtractor = Some(new StreamRecordTimestamp)
    +    this
    +  }
    +
    +  /**
    +    * Sets a custom timestamp extractor to be used for the rowtime attribute.
    +    *
    +    * @param extractor The [[TimestampExtractor]] to extract the rowtime attribute
    +    *                  from the physical type.
    +    */
    +  def timestampFromExtractor(extractor: TimestampExtractor): Rowtime = {
    +    timestampExtractor = Some(extractor)
    +    this
    +  }
    +
    +  /**
    +    * Sets a built-in watermark strategy for ascending rowtime attributes.
    +    *
    +    * Emits a watermark of the maximum observed timestamp so far minus 1.
    +    * Rows that have a timestamp equal to the max timestamp are not late.
    +    */
    +  def watermarkPeriodicAscending(): Rowtime = {
    +    watermarkStrategy = Some(new AscendingTimestamps)
    +    this
    +  }
    +
    +  /**
    +    * Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded
    +    * time interval.
    +    *
    +    * Emits watermarks which are the maximum observed timestamp minus the specified delay.
    +    */
    +  def watermarkPeriodicBounding(delay: Long): Rowtime = {
    +    watermarkStrategy = Some(new BoundedOutOfOrderTimestamps(delay))
    +    this
    +  }
    +
    +  /**
    +    * Sets a built-in watermark strategy which indicates the watermarks should be preserved from the
    +    * underlying DataStream API.
    +    */
    +  def watermarkFromDataStream(): Rowtime = {
    --- End diff --
    
    `preserveSourceWatermarks()`
    
    `DataStream` is only an internal aspect that's not visible when using table sources.


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r164166552
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala ---
    @@ -0,0 +1,131 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import org.apache.flink.table.api.Types
    +import org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor, normalizeWatermarkStrategy}
    +import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp, TimestampExtractor}
    +import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
    +
    +import scala.collection.mutable
    +
    +/**
    +  * Rowtime descriptor for describing an event time attribute in the schema.
    +  */
    +class Rowtime extends Descriptor {
    +
    +  private var timestampExtractor: Option[TimestampExtractor] = None
    +  private var watermarkStrategy: Option[WatermarkStrategy] = None
    +
    +  /**
    +    * Sets a built-in timestamp extractor that converts an existing [[Long]] or
    +    * [[Types.SQL_TIMESTAMP]] field into the rowtime attribute.
    +    *
    +    * @param fieldName The field to convert into a rowtime attribute.
    +    */
    +  def timestampFromField(fieldName: String): Rowtime = {
    +    timestampExtractor = Some(new ExistingField(fieldName))
    +    this
    +  }
    +
    +  /**
    +    * Sets a built-in timestamp extractor that converts the assigned timestamp from
    +    * a DataStream API record into the rowtime attribute.
    +    *
    +    * Note: This extractor only works in streaming environments.
    +    */
    +  def timestampFromDataStream(): Rowtime = {
    +    timestampExtractor = Some(new StreamRecordTimestamp)
    +    this
    +  }
    +
    +  /**
    +    * Sets a custom timestamp extractor to be used for the rowtime attribute.
    +    *
    +    * @param extractor The [[TimestampExtractor]] to extract the rowtime attribute
    +    *                  from the physical type.
    +    */
    +  def timestampFromExtractor(extractor: TimestampExtractor): Rowtime = {
    +    timestampExtractor = Some(extractor)
    +    this
    +  }
    +
    +  /**
    +    * Sets a built-in watermark strategy for ascending rowtime attributes.
    +    *
    +    * Emits a watermark of the maximum observed timestamp so far minus 1.
    +    * Rows that have a timestamp equal to the max timestamp are not late.
    +    */
    +  def watermarkPeriodicAscending(): Rowtime = {
    --- End diff --
    
    `periodicAscendingWatermarks()`?


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r163863728
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala ---
    @@ -0,0 +1,95 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.table.api.TableSchema
    +
    +import scala.collection.mutable
    +
    +/**
    +  * Describes a schema of a table.
    +  */
    +class Schema extends Descriptor {
    +
    +  private val tableSchema: mutable.LinkedHashMap[String, String] =
    +      mutable.LinkedHashMap[String, String]()
    +
    +  /**
    +    * Sets the schema with field names and the types. Required.
    +    *
    +    * This method overwrites existing fields added with [[field()]].
    +    *
    +    * @param schema the table schema
    +    */
    +  def schema(schema: TableSchema): Schema = {
    --- End diff --
    
    We have no syntax for schema strings yet. `ROW(name TYPE)` is a datatype not a schema.


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r163010702
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala ---
    @@ -0,0 +1,32 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +/**
    +  * A class that adds a set of string-based, normalized properties for describing a
    +  * table source or table sink.
    +  */
    +abstract class Descriptor {
    +
    +  /**
    +    * Internal method for properties conversion.
    +    */
    +  def addProperties(properties: NormalizedProperties): Unit
    --- End diff --
    
    does this method have to be public or can we hide it?


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r162967648
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala ---
    @@ -18,28 +18,282 @@
     
     package org.apache.flink.table.catalog
     
    -import java.util.{HashMap => JHashMap, Map => JMap}
     import java.lang.{Long => JLong}
    +import java.util.{HashMap => JHashMap, Map => JMap}
     
    -import org.apache.flink.table.api.TableSchema
    +import org.apache.flink.table.api.{TableException, TableSchema}
    +import org.apache.flink.table.catalog.ExternalCatalogTable.{TableTypeConnector, toConnectorDescriptor, toMetadataDescriptor, toStatisticsDescriptor}
    +import org.apache.flink.table.descriptors.DescriptorUtils.{connector, metadata}
    +import org.apache.flink.table.descriptors._
     import org.apache.flink.table.plan.stats.TableStats
     
    +import scala.collection.JavaConverters._
    +
     /**
       * Defines a table in an [[ExternalCatalog]].
    -  *
    -  * @param tableType            Table type, e.g csv, hbase, kafka
    -  * @param schema               Schema of the table (column names and types)
    -  * @param properties           Properties of the table
    -  * @param stats                Statistics of the table
    -  * @param comment              Comment of the table
    -  * @param createTime           Create timestamp of the table
    -  * @param lastAccessTime       Timestamp of last access of the table
       */
    -case class ExternalCatalogTable(
    +class ExternalCatalogTable(
    --- End diff --
    
    Add descriptions for constructor arguments


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r162965980
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala ---
    @@ -23,6 +23,7 @@ import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
     import org.apache.flink.table.expressions.ExpressionParser
     import org.apache.flink.table.api._
     import org.apache.flink.table.functions.{AggregateFunction, TableFunction}
    +import org.apache.flink.table.descriptors.{BatchTableSourceDescriptor, ConnectorDescriptor}
    --- End diff --
    
    remove


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r163005744
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JSON.scala ---
    @@ -0,0 +1,140 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import java.util
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.table.api.TableSchema
    +
    +import scala.collection.mutable
    +import scala.collection.JavaConverters._
    +
    +/**
    +  * Encoding descriptor for JSON.
    +  */
    +class JSON extends EncodingDescriptor("json") {
    --- End diff --
    
    Should we add a method that defines the schema with a JSON Schema string? We would need a parser, but have immediate support for nested schema. 
    
    Alternatively, we could use the nested schema parser of `TypeStringUtils` but this would not be JSON Schema.
    



---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r163011691
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala ---
    @@ -0,0 +1,144 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import org.apache.flink.table.api.Types
    +import org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor, normalizeWatermarkStrategy}
    +import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp, TimestampExtractor}
    +import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
    +
    +import scala.collection.mutable
    +
    +/**
    +  * Rowtime descriptor for describing an event time attribute in the schema.
    +  */
    +class Rowtime extends Descriptor {
    +
    +  private var rowtimeName: Option[String] = None
    +  private var timestampExtractor: Option[TimestampExtractor] = None
    +  private var watermarkStrategy: Option[WatermarkStrategy] = None
    +
    +  /**
    +    * Declares a field of the schema to be the rowtime attribute. Required.
    +    *
    +    * @param fieldName The name of the field that becomes the processing time field.
    +    */
    +  def field(fieldName: String): Rowtime = {
    +    rowtimeName = Some(fieldName)
    +    this
    +  }
    +
    +  /**
    +    * Sets a built-in timestamp extractor that converts an existing [[Long]] or
    +    * [[Types.SQL_TIMESTAMP]] field into the rowtime attribute.
    +    *
    +    * @param fieldName The field to convert into a rowtime attribute.
    +    */
    +  def timestampFromField(fieldName: String): Rowtime = {
    +    timestampExtractor = Some(new ExistingField(fieldName))
    +    this
    +  }
    +
    +  /**
    +    * Sets a built-in timestamp extractor that converts the assigned timestamp from
    +    * a DataStream API record into the rowtime attribute.
    +    *
    +    * Note: This extractor only works in streaming environments.
    +    */
    +  def timestampFromDataStream(): Rowtime = {
    +    timestampExtractor = Some(new StreamRecordTimestamp)
    +    this
    +  }
    +
    +  /**
    +    * Sets a custom timestamp extractor to be used for the rowtime attribute.
    +    *
    +    * @param extractor The [[TimestampExtractor]] to extract the rowtime attribute
    +    *                  from the physical type.
    +    */
    +  def timestampFromExtractor(extractor: TimestampExtractor): Rowtime = {
    +    timestampExtractor = Some(extractor)
    +    this
    +  }
    +
    +  /**
    +    * Sets a built-in watermark strategy for ascending rowtime attributes.
    +    *
    +    * Emits a watermark of the maximum observed timestamp so far minus 1.
    +    * Rows that have a timestamp equal to the max timestamp are not late.
    +    */
    +  def watermarkPeriodicAscending(): Rowtime = {
    +    watermarkStrategy = Some(new AscendingTimestamps)
    +    this
    +  }
    +
    +  /**
    +    * Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded
    +    * time interval.
    +    *
    +    * Emits watermarks which are the maximum observed timestamp minus the specified delay.
    +    */
    +  def watermarkPeriodicBounding(delay: Long): Rowtime = {
    +    watermarkStrategy = Some(new BoundedOutOfOrderTimestamps(delay))
    +    this
    +  }
    +
    +  /**
    +    * Sets a built-in watermark strategy which indicates the watermarks should be preserved from the
    +    * underlying DataStream API.
    +    */
    +  def watermarkFromDataStream(): Rowtime = {
    +    watermarkStrategy = Some(PreserveWatermarks.INSTANCE)
    +    this
    +  }
    +
    +  /**
    +    * Sets a custom watermark strategy to be used for the rowtime attribute.
    +    */
    +  def watermarkFromStrategy(strategy: WatermarkStrategy): Rowtime = {
    +    watermarkStrategy = Some(strategy)
    +    this
    +  }
    +
    +  /**
    +    * Internal method for properties conversion.
    +    */
    +  final override def addProperties(properties: NormalizedProperties): Unit = {
    +    val props = mutable.HashMap[String, String]()
    +    if (rowtimeName.isDefined) {
    --- End diff --
    
    All properties must be defined. Otherwise, the descriptor is not correctly configured.


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r162996633
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorUtils.scala ---
    @@ -0,0 +1,69 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import java.util
    +
    +/**
    +  * Utilities for working with a [[org.apache.flink.table.descriptors.Descriptor]].
    +  */
    +object DescriptorUtils {
    +
    +  def hasConnector(properties: util.Map[String, String], connector: String): Boolean = {
    +    val tpe = properties.get("connector.type")
    +    tpe != null || tpe == connector
    +  }
    +
    +  def hasEncoding(properties: util.Map[String, String], encoding: String): Boolean = {
    +    val tpe = properties.get("encoding.type")
    +    tpe != null || tpe == encoding
    --- End diff --
    
    should be  `tpe != null && tpe == encoding`?


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r162947812
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala ---
    @@ -107,6 +111,16 @@ abstract class BatchTableEnvironment(
         }
       }
     
    +  /**
    +    * Creates a table from a descriptor that describes the resulting table schema, the source
    +    * connector, source encoding, and other properties.
    +    *
    +    * @param schema schema descriptor describing the table to create
    +    */
    +  def createTable(schema: Schema): BatchTableSourceDescriptor = {
    --- End diff --
    
    I'm not sure about the approach of returning a `TableSourceDescriptor`. I think it would be better if the table creation and registration would be completed within this method, i.e., the table should be completely defined by the argument of the method.
    
    For example
    
    ```
    tEnv.registerTableSource(
      "MyTable",
      TableSource.create(tEnv)
        .withSchema(
          Schema()
            .field(...)
            .field(...))
       .withConnector()
         ...
       .toTableSource()
      )
    ```
    
    In this design, we would reuse existing `registerTableSource` method and `TableSource.create` is a static method that returns a `TableSourceDescriptor`. Not sure if this is the best approach, but I like that the table is completely defined within the method call.


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r163012882
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala ---
    @@ -0,0 +1,74 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import org.apache.flink.table.api.{BatchTableEnvironment, Table, TableException}
    +import org.apache.flink.table.sources.{BatchTableSource, TableSource, TableSourceFactoryService}
    +
    +class BatchTableSourceDescriptor(tableEnv: BatchTableEnvironment, schema: Schema)
    --- End diff --
    
    Add `RowtimeDescriptor`. Batch table sources support timestamp extraction as well.


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r164168781
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala ---
    @@ -0,0 +1,131 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import org.apache.flink.table.api.Types
    +import org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor, normalizeWatermarkStrategy}
    +import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp, TimestampExtractor}
    +import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
    +
    +import scala.collection.mutable
    +
    +/**
    +  * Rowtime descriptor for describing an event time attribute in the schema.
    +  */
    +class Rowtime extends Descriptor {
    +
    +  private var timestampExtractor: Option[TimestampExtractor] = None
    +  private var watermarkStrategy: Option[WatermarkStrategy] = None
    +
    +  /**
    +    * Sets a built-in timestamp extractor that converts an existing [[Long]] or
    +    * [[Types.SQL_TIMESTAMP]] field into the rowtime attribute.
    +    *
    +    * @param fieldName The field to convert into a rowtime attribute.
    +    */
    +  def timestampFromField(fieldName: String): Rowtime = {
    +    timestampExtractor = Some(new ExistingField(fieldName))
    +    this
    +  }
    +
    +  /**
    +    * Sets a built-in timestamp extractor that converts the assigned timestamp from
    +    * a DataStream API record into the rowtime attribute.
    +    *
    +    * Note: This extractor only works in streaming environments.
    +    */
    +  def timestampFromDataStream(): Rowtime = {
    --- End diff --
    
    `preserveSourceTimestamps()`


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r164168933
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala ---
    @@ -0,0 +1,131 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import org.apache.flink.table.api.Types
    +import org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor, normalizeWatermarkStrategy}
    +import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp, TimestampExtractor}
    +import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
    +
    +import scala.collection.mutable
    +
    +/**
    +  * Rowtime descriptor for describing an event time attribute in the schema.
    +  */
    +class Rowtime extends Descriptor {
    +
    +  private var timestampExtractor: Option[TimestampExtractor] = None
    +  private var watermarkStrategy: Option[WatermarkStrategy] = None
    +
    +  /**
    +    * Sets a built-in timestamp extractor that converts an existing [[Long]] or
    +    * [[Types.SQL_TIMESTAMP]] field into the rowtime attribute.
    +    *
    +    * @param fieldName The field to convert into a rowtime attribute.
    +    */
    +  def timestampFromField(fieldName: String): Rowtime = {
    --- End diff --
    
    `timestampsFromField()`


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r162996455
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorUtils.scala ---
    @@ -0,0 +1,69 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import java.util
    +
    +/**
    +  * Utilities for working with a [[org.apache.flink.table.descriptors.Descriptor]].
    +  */
    +object DescriptorUtils {
    +
    +  def hasConnector(properties: util.Map[String, String], connector: String): Boolean = {
    +    val tpe = properties.get("connector.type")
    +    tpe != null || tpe == connector
    --- End diff --
    
    should be `tpe != null && tpe == connector`?


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r162964475
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala ---
    @@ -136,12 +136,51 @@ case class CatalogAlreadyExistException(
       def this(catalog: String) = this(catalog, null)
     }
     
    +/**
    +  * Exception for not finding a [[org.apache.flink.table.sources.TableSourceFactory]] for the
    +  * given properties.
    +  *
    +  * @param properties properties that describe the table source
    +  * @param cause the cause
    +  */
    +case class NoMatchingTableSourceException(
    +    properties: Map[String, String],
    +    cause: Throwable)
    +    extends RuntimeException(
    +      s"Could not find a table source factory in the classpath satisfying the " +
    +        s"following properties: \n${properties.map(e => e._1 + "=" +  e._2 ).mkString("\n")}",
    +      cause) {
    +
    +  def this(properties: Map[String, String]) = this(properties, null)
    +}
    +
    +/**
    +  * Exception for finding more than one [[org.apache.flink.table.sources.TableSourceFactory]] for
    +  * the given properties.
    +  *
    +  * @param properties properties that describe the table source
    +  * @param cause the cause
    +  */
    +case class AmbiguousTableSourceException(
    +    properties: Map[String, String],
    +    cause: Throwable)
    +    extends RuntimeException(
    +      s"More than one table source factory in the classpath satisfying the " +
    +        s"following properties: \n${properties.map(e => e._1 + "=" +  e._2 ).mkString("\n")}",
    +      cause) {
    +
    +  def this(properties: Map[String, String]) = this(properties, null)
    +}
    +
     /**
       * Exception for not finding a [[TableSourceConverter]] for a given table type.
       *
       * @param tableType table type
       * @param cause the cause
    +  * @deprecated Use table source factories instead.
       */
    +@Deprecated
    +@deprecated("Use table factories instead.")
    --- End diff --
    
    Give a class name.


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r163836869
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala ---
    @@ -0,0 +1,32 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +/**
    +  * A class that adds a set of string-based, normalized properties for describing a
    +  * table source or table sink.
    +  */
    +abstract class Descriptor {
    +
    +  /**
    +    * Internal method for properties conversion.
    +    */
    +  def addProperties(properties: NormalizedProperties): Unit
    --- End diff --
    
    I will add `private[flink]` but it will be public in Java.


---

[GitHub] flink issue #5240: [FLINK-8240] [table] Create unified interfaces to configu...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/5240
  
    Thanks for the feedback @fhueske. I hope I could address most of it. I think we should merge this PR (if you agree) and add more PRs for this issue as the next steps. I suggest the following subtasks:
    
    - Add validation for the CSV format
    - Add full CsvTableSourceFactory support (incl. proctime, rowtime, and schema mapping)
    - Add a JSON schema parser to the JSON and logic for creating a table source from it
    - Add validation for the JSON format
    - Add validation for the Rowtime descriptor
    - Add validation for StreamTableDescriptor
    - Add validation for BatchTableDescriptor
    - Add KafkaTableSource factories 
    
    What do you think?


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r163838489
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala ---
    @@ -0,0 +1,32 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +/**
    +  * A class that adds a set of string-based, normalized properties for describing a
    +  * table source or table sink.
    +  */
    +abstract class Descriptor {
    --- End diff --
    
    We could add a method here but the same logic would have to be done somewhere for properties that have been read from a config file.


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r164126202
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala ---
    @@ -39,6 +39,9 @@ case class SqlParserException(
     
     /**
       * General Exception for all errors during table handling.
    +  *
    +  * This exception indicates that an internal error occurred or the feature is not fully
    --- End diff --
    
    "This exception indicates that an internal error occurred or that a feature is not supported yet. Usually, this exception does not indicate a fault of the user."



---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5240


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r164151340
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala ---
    @@ -69,17 +72,76 @@ class Schema extends Descriptor {
         */
       def field(fieldName: String, fieldType: String): Schema = {
         if (tableSchema.contains(fieldName)) {
    -      throw new IllegalArgumentException(s"Duplicate field name $fieldName.")
    +      throw new ValidationException(s"Duplicate field name $fieldName.")
    +    }
    +
    +    val fieldProperties = mutable.LinkedHashMap[String, String]()
    +    fieldProperties += (DescriptorUtils.TYPE -> fieldType)
    +
    +    tableSchema += (fieldName -> fieldProperties)
    +
    +    lastField = Some(fieldName)
    +    this
    +  }
    +
    +  /**
    +    * Specifies the origin of the previously defined field. The origin field is defined by a
    +    * connector or format.
    +    *
    +    * E.g. field("myString", Types.STRING).from("CSV_MY_STRING")
    +    */
    +  def from(originFieldName: String): Schema = {
    +    lastField match {
    +      case None => throw new ValidationException("No field defined previously. Use field() before.")
    +      case Some(f) =>
    +        tableSchema(f) += (DescriptorUtils.FROM -> originFieldName)
    +        lastField = None
    +    }
    +    this
    +  }
    +
    +  /**
    +    * Specifies the previously defined field as a processing-time attribute.
    +    *
    +    * E.g. field("myString", Types.STRING).proctime()
    --- End diff --
    
    `field("procTime", Types.SQL_TIMESTAMP).proctime()`


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r162990874
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala ---
    @@ -23,22 +23,74 @@ import java.net.URL
     import org.apache.commons.configuration.{ConfigurationException, ConversionException, PropertiesConfiguration}
     import org.apache.flink.annotation.VisibleForTesting
     import org.apache.flink.table.annotation.TableType
    -import org.apache.flink.table.api.{AmbiguousTableSourceConverterException, NoMatchedTableSourceConverterException, TableException}
    +import org.apache.flink.table.api._
     import org.apache.flink.table.plan.schema.{BatchTableSourceTable, StreamTableSourceTable, TableSourceTable}
     import org.apache.flink.table.plan.stats.FlinkStatistic
    -import org.apache.flink.table.sources.{BatchTableSource, StreamTableSource, TableSource}
    +import org.apache.flink.table.sources.{BatchTableSource, StreamTableSource, TableSource, TableSourceFactoryService}
     import org.apache.flink.table.util.Logging
     import org.apache.flink.util.InstantiationUtil
     import org.reflections.Reflections
     
    -import scala.collection.JavaConverters._
    -import scala.collection.mutable
    +import _root_.scala.collection.JavaConverters._
    +import _root_.scala.collection.mutable
     
     /**
       * The utility class is used to convert ExternalCatalogTable to TableSourceTable.
       */
     object ExternalTableSourceUtil extends Logging {
     
    +  /**
    +    * Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance
    +    *
    +    * @param externalCatalogTable the [[ExternalCatalogTable]] instance which to convert
    +    * @return converted [[TableSourceTable]] instance from the input catalog table
    +    */
    +  def fromExternalCatalogTable(
    +      tableEnv: TableEnvironment,
    +      externalCatalogTable: ExternalCatalogTable)
    +    : TableSourceTable[_] = {
    +
    +    // check for the legacy external catalog path
    +    if (externalCatalogTable.isLegacyTableType) {
    +      LOG.warn("External catalog tables based on TableType annotations are deprecated. " +
    +        "Please consider updating them to TableSourceFactories.")
    +      fromExternalCatalogTableType(externalCatalogTable)
    +    }
    +    // use the factory approach
    +    else {
    +      val source = TableSourceFactoryService.findTableSourceFactory(externalCatalogTable)
    +      tableEnv match {
    +        // check for a batch table source in this batch environment
    +        case _: BatchTableEnvironment =>
    +          source match {
    +            case bts: BatchTableSource[_] =>
    +              new BatchTableSourceTable(
    +                bts,
    +                new FlinkStatistic(externalCatalogTable.getTableStats))
    +            case _ => throw new TableException(
    +              s"Found table source '${source.getClass.getCanonicalName}' is not applicable " +
    +                s"in a batch environment.")
    +          }
    +        // check for a stream table source in this streaming environment
    +        case _: StreamTableEnvironment =>
    +          source match {
    +            case sts: StreamTableSource[_] =>
    +              new StreamTableSourceTable(
    +                sts,
    +                new FlinkStatistic(externalCatalogTable.getTableStats))
    +            case _ => throw new TableException(
    +              s"Found table source '${source.getClass.getCanonicalName}' is not applicable " +
    +                s"in a streaming environment.")
    +          }
    +        case _ => throw new TableException("Unsupported table environment.")
    +      }
    +    }
    +  }
    +
    +  // ----------------------------------------------------------------------------------------------
    +  // NOTE: the following line can be removed once we drop support for TableType
    --- End diff --
    
    I think we can also remove the `org.reflections:reflections` dependency once we removed this.


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r162947784
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---
    @@ -125,6 +129,16 @@ abstract class StreamTableEnvironment(
         }
       }
     
    +  /**
    +    * Creates a table from a descriptor that describes the resulting table schema, the source
    +    * connector, the source encoding, and other properties.
    +    *
    +    * @param schema schema descriptor describing the table to create
    +    */
    +  def createTable(schema: Schema): StreamTableSourceDescriptor = {
    --- End diff --
    
    See comment on `BatchTableEnvironment`


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r162995916
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptor.scala ---
    @@ -0,0 +1,47 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import org.apache.flink.table.descriptors.DescriptorUtils.connector
    +
    +/**
    +  * Describes a connector to an other system.
    +  *
    +  * @param tpe string identifier for the connector
    +  */
    +abstract class ConnectorDescriptor(private val tpe: String) extends Descriptor {
    +
    +  /**
    +    * Internal method for properties conversion.
    +    */
    +  final def addProperties(properties: NormalizedProperties): Unit = {
    +    properties.putString(connector("type"), tpe)
    +    val connectorProperties = new NormalizedProperties()
    +    addConnectorProperties(connectorProperties)
    +    connectorProperties.getProperties.foreach { case (k, v) =>
    --- End diff --
    
    why do we need to go over the properties again? Couldn't we implement `addConnectorProperties` to properly add the properties directly into `properties`?


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r163007563
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JSON.scala ---
    @@ -0,0 +1,140 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import java.util
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.table.api.TableSchema
    +
    +import scala.collection.mutable
    +import scala.collection.JavaConverters._
    +
    +/**
    +  * Encoding descriptor for JSON.
    +  */
    +class JSON extends EncodingDescriptor("json") {
    +
    +  private val encodingSchema: mutable.LinkedHashMap[String, String] =
    +      mutable.LinkedHashMap[String, String]()
    +  private var fieldMapping: Option[util.Map[String, String]] = None
    +  private var failOnMissingField: Option[Boolean] = None
    +
    +  /**
    +    * Sets the JSON schema with field names and the types for the JSON-encoded input.
    +    * The JSON schema must not contain nested fields.
    +    *
    +    * This method overwrites existing fields added with [[field()]].
    +    *
    +    * @param schema the table schema
    +    */
    +  def schema(schema: TableSchema): JSON = {
    +    this.encodingSchema.clear()
    +    NormalizedProperties.normalizeTableSchema(schema).foreach {
    +      case (n, t) => field(n, t)
    +    }
    +    this
    +  }
    +
    +  /**
    +    * Adds a JSON field with the field name and the type information for the JSON-encoding.
    +    * This method can be called multiple times. The call order of this method defines
    +    * also the order of the fields in the JSON-encoding.
    +    *
    +    * @param fieldName the field name
    +    * @param fieldType the type information of the field
    +    */
    +  def field(fieldName: String, fieldType: TypeInformation[_]): JSON = {
    +    field(fieldName, NormalizedProperties.normalizeTypeInfo(fieldType))
    +    this
    +  }
    +
    +  /**
    +    * Adds a JSON field with the field name and the type string for the JSON-encoding.
    +    * This method can be called multiple times. The call order of this method defines
    +    * also the order of the fields in the JSON-encoding.
    +    *
    +    * @param fieldName the field name
    +    * @param fieldType the type string of the field
    +    */
    +  def field(fieldName: String, fieldType: String): JSON = {
    +    if (encodingSchema.contains(fieldName)) {
    +      throw new IllegalArgumentException(s"Duplicate field name $fieldName.")
    +    }
    +    encodingSchema += (fieldName -> fieldType)
    +    this
    +  }
    +
    +  /**
    +    * Sets a mapping from schema fields to fields of the JSON schema.
    +    *
    +    * A field mapping is required if the fields of produced tables should be named different than
    +    * the fields of the JSON records.
    +    * The key of the provided Map refers to the field of the table schema,
    +    * the value to the field in the JSON schema.
    +    *
    +    * @param tableToJsonMapping A mapping from table schema fields to JSON schema fields.
    +    * @return The builder.
    +    */
    +  def tableToJsonMapping(tableToJsonMapping: util.Map[String, String]): JSON = {
    +    this.fieldMapping = Some(tableToJsonMapping)
    +    this
    +  }
    +
    +  /**
    +    * Sets flag whether to fail if a field is missing or not.
    +    *
    +    * @param failOnMissingField If set to true, the operation fails if there is a missing field.
    +    *                           If set to false, a missing field is set to null.
    +    * @return The builder.
    +    */
    +  def failOnMissingField(failOnMissingField: Boolean): JSON = {
    +    this.failOnMissingField = Some(failOnMissingField)
    +    this
    +  }
    +
    +  /**
    +    * Internal method for encoding properties conversion.
    +    */
    +  override protected def addEncodingProperties(properties: NormalizedProperties): Unit = {
    +    properties.putIndexedFixedProperties(
    --- End diff --
    
    We should add properties with support for nested fields


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r163002001
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JSON.scala ---
    @@ -0,0 +1,140 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import java.util
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.table.api.TableSchema
    +
    +import scala.collection.mutable
    +import scala.collection.JavaConverters._
    +
    +/**
    +  * Encoding descriptor for JSON.
    +  */
    +class JSON extends EncodingDescriptor("json") {
    +
    +  private val encodingSchema: mutable.LinkedHashMap[String, String] =
    +      mutable.LinkedHashMap[String, String]()
    +  private var fieldMapping: Option[util.Map[String, String]] = None
    +  private var failOnMissingField: Option[Boolean] = None
    +
    +  /**
    +    * Sets the JSON schema with field names and the types for the JSON-encoded input.
    +    * The JSON schema must not contain nested fields.
    +    *
    +    * This method overwrites existing fields added with [[field()]].
    +    *
    +    * @param schema the table schema
    +    */
    +  def schema(schema: TableSchema): JSON = {
    +    this.encodingSchema.clear()
    +    NormalizedProperties.normalizeTableSchema(schema).foreach {
    +      case (n, t) => field(n, t)
    +    }
    +    this
    +  }
    +
    +  /**
    +    * Adds a JSON field with the field name and the type information for the JSON-encoding.
    +    * This method can be called multiple times. The call order of this method defines
    +    * also the order of the fields in the JSON-encoding.
    +    *
    +    * @param fieldName the field name
    +    * @param fieldType the type information of the field
    +    */
    +  def field(fieldName: String, fieldType: TypeInformation[_]): JSON = {
    +    field(fieldName, NormalizedProperties.normalizeTypeInfo(fieldType))
    +    this
    +  }
    +
    +  /**
    +    * Adds a JSON field with the field name and the type string for the JSON-encoding.
    +    * This method can be called multiple times. The call order of this method defines
    +    * also the order of the fields in the JSON-encoding.
    +    *
    +    * @param fieldName the field name
    +    * @param fieldType the type string of the field
    +    */
    +  def field(fieldName: String, fieldType: String): JSON = {
    +    if (encodingSchema.contains(fieldName)) {
    +      throw new IllegalArgumentException(s"Duplicate field name $fieldName.")
    +    }
    +    encodingSchema += (fieldName -> fieldType)
    +    this
    +  }
    +
    +  /**
    +    * Sets a mapping from schema fields to fields of the JSON schema.
    +    *
    +    * A field mapping is required if the fields of produced tables should be named different than
    +    * the fields of the JSON records.
    +    * The key of the provided Map refers to the field of the table schema,
    +    * the value to the field in the JSON schema.
    +    *
    +    * @param tableToJsonMapping A mapping from table schema fields to JSON schema fields.
    +    * @return The builder.
    +    */
    +  def tableToJsonMapping(tableToJsonMapping: util.Map[String, String]): JSON = {
    --- End diff --
    
    We might want to make field mappings independent of the encoding. For example field mappings could also be used for JDBC connectors which do not have an encoding.


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r163007886
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JSON.scala ---
    @@ -0,0 +1,140 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import java.util
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.table.api.TableSchema
    +
    +import scala.collection.mutable
    +import scala.collection.JavaConverters._
    +
    +/**
    +  * Encoding descriptor for JSON.
    +  */
    +class JSON extends EncodingDescriptor("json") {
    +
    +  private val encodingSchema: mutable.LinkedHashMap[String, String] =
    +      mutable.LinkedHashMap[String, String]()
    +  private var fieldMapping: Option[util.Map[String, String]] = None
    +  private var failOnMissingField: Option[Boolean] = None
    +
    +  /**
    +    * Sets the JSON schema with field names and the types for the JSON-encoded input.
    +    * The JSON schema must not contain nested fields.
    +    *
    +    * This method overwrites existing fields added with [[field()]].
    +    *
    +    * @param schema the table schema
    +    */
    +  def schema(schema: TableSchema): JSON = {
    +    this.encodingSchema.clear()
    +    NormalizedProperties.normalizeTableSchema(schema).foreach {
    +      case (n, t) => field(n, t)
    +    }
    +    this
    +  }
    +
    +  /**
    +    * Adds a JSON field with the field name and the type information for the JSON-encoding.
    +    * This method can be called multiple times. The call order of this method defines
    +    * also the order of the fields in the JSON-encoding.
    +    *
    +    * @param fieldName the field name
    +    * @param fieldType the type information of the field
    +    */
    +  def field(fieldName: String, fieldType: TypeInformation[_]): JSON = {
    +    field(fieldName, NormalizedProperties.normalizeTypeInfo(fieldType))
    +    this
    +  }
    +
    +  /**
    +    * Adds a JSON field with the field name and the type string for the JSON-encoding.
    +    * This method can be called multiple times. The call order of this method defines
    +    * also the order of the fields in the JSON-encoding.
    +    *
    +    * @param fieldName the field name
    +    * @param fieldType the type string of the field
    +    */
    +  def field(fieldName: String, fieldType: String): JSON = {
    +    if (encodingSchema.contains(fieldName)) {
    +      throw new IllegalArgumentException(s"Duplicate field name $fieldName.")
    +    }
    +    encodingSchema += (fieldName -> fieldType)
    +    this
    +  }
    +
    +  /**
    +    * Sets a mapping from schema fields to fields of the JSON schema.
    +    *
    +    * A field mapping is required if the fields of produced tables should be named different than
    +    * the fields of the JSON records.
    +    * The key of the provided Map refers to the field of the table schema,
    +    * the value to the field in the JSON schema.
    +    *
    +    * @param tableToJsonMapping A mapping from table schema fields to JSON schema fields.
    +    * @return The builder.
    +    */
    +  def tableToJsonMapping(tableToJsonMapping: util.Map[String, String]): JSON = {
    +    this.fieldMapping = Some(tableToJsonMapping)
    +    this
    +  }
    +
    +  /**
    +    * Sets flag whether to fail if a field is missing or not.
    +    *
    +    * @param failOnMissingField If set to true, the operation fails if there is a missing field.
    +    *                           If set to false, a missing field is set to null.
    +    * @return The builder.
    +    */
    +  def failOnMissingField(failOnMissingField: Boolean): JSON = {
    +    this.failOnMissingField = Some(failOnMissingField)
    +    this
    +  }
    +
    +  /**
    +    * Internal method for encoding properties conversion.
    +    */
    +  override protected def addEncodingProperties(properties: NormalizedProperties): Unit = {
    +    properties.putIndexedFixedProperties(
    --- End diff --
    
    Maybe add support for nested fields to utils.
    Other encodings will need this as well.


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r163065450
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala ---
    @@ -0,0 +1,53 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.sources
    +
    +import java.util
    +
    +/**
    +  * A factory to create a [[TableSource]]. This factory is used with Java's Service Provider
    +  * Interfaces (SPI) for discovering. A factory is called with a set of normalized properties that
    +  * describe the desired table source. The factory allows for matching to the given set of
    +  * properties and creating a configured [[TableSource]] accordingly.
    +  *
    +  * Classes that implement this interface need to be added to the
    +  * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' file of a JAR file in
    --- End diff --
    
    do all need to be added to the same file? Or can we have separate files for different modules. For instance, a `Kafka011JsonTableFactory` would be in the Kafka connectors module. Would a user have to change the service file if the Kafka factory should be used or can we built it in a way that it is sufficient include the Kafka connectors JAR?


---

[GitHub] flink issue #5240: [FLINK-8240] [table] Create unified interfaces to configu...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/5240
  
    Hi Timo,
    
    the PR looks good overall. I've made a few suggestion mostly about renaming methods or extending docs. I'd also propose to add a `supportedProperties()` method to `TableSourceFactory` that can be used to validate whether the factory supports all user-provided properties of a connector or format.
    
    What do you think?
    Fabian


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r163865339
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala ---
    @@ -0,0 +1,66 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import org.apache.flink.table.descriptors.DescriptorUtils.statistics
    +import org.apache.flink.table.plan.stats.TableStats
    +
    +import scala.collection.JavaConverters._
    +
    +/**
    +  * Common class for all descriptors describing a table source.
    +  */
    +abstract class TableSourceDescriptor extends Descriptor {
    +
    +  protected var schemaDescriptor: Option[Schema] = None
    +  protected var connectorDescriptor: Option[ConnectorDescriptor] = None
    +  protected var encodingDescriptor: Option[EncodingDescriptor] = None
    +  protected var proctimeDescriptor: Option[Proctime] = None
    +  protected var rowtimeDescriptor: Option[Rowtime] = None
    +  protected var statisticsDescriptor: Option[Statistics] = None
    +  protected var metaDescriptor: Option[Metadata] = None
    +
    --- End diff --
    
    I added this functionality to the `Schema` descriptor.


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r164168602
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala ---
    @@ -0,0 +1,131 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import org.apache.flink.table.api.Types
    +import org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor, normalizeWatermarkStrategy}
    +import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp, TimestampExtractor}
    +import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
    +
    +import scala.collection.mutable
    +
    +/**
    +  * Rowtime descriptor for describing an event time attribute in the schema.
    +  */
    +class Rowtime extends Descriptor {
    +
    +  private var timestampExtractor: Option[TimestampExtractor] = None
    +  private var watermarkStrategy: Option[WatermarkStrategy] = None
    +
    +  /**
    +    * Sets a built-in timestamp extractor that converts an existing [[Long]] or
    +    * [[Types.SQL_TIMESTAMP]] field into the rowtime attribute.
    +    *
    +    * @param fieldName The field to convert into a rowtime attribute.
    +    */
    +  def timestampFromField(fieldName: String): Rowtime = {
    +    timestampExtractor = Some(new ExistingField(fieldName))
    +    this
    +  }
    +
    +  /**
    +    * Sets a built-in timestamp extractor that converts the assigned timestamp from
    +    * a DataStream API record into the rowtime attribute.
    +    *
    +    * Note: This extractor only works in streaming environments.
    +    */
    +  def timestampFromDataStream(): Rowtime = {
    +    timestampExtractor = Some(new StreamRecordTimestamp)
    +    this
    +  }
    +
    +  /**
    +    * Sets a custom timestamp extractor to be used for the rowtime attribute.
    +    *
    +    * @param extractor The [[TimestampExtractor]] to extract the rowtime attribute
    +    *                  from the physical type.
    +    */
    +  def timestampFromExtractor(extractor: TimestampExtractor): Rowtime = {
    --- End diff --
    
    `timestampsFromExtractor()`


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r164149992
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala ---
    @@ -69,17 +72,76 @@ class Schema extends Descriptor {
         */
       def field(fieldName: String, fieldType: String): Schema = {
         if (tableSchema.contains(fieldName)) {
    -      throw new IllegalArgumentException(s"Duplicate field name $fieldName.")
    +      throw new ValidationException(s"Duplicate field name $fieldName.")
    +    }
    +
    +    val fieldProperties = mutable.LinkedHashMap[String, String]()
    +    fieldProperties += (DescriptorUtils.TYPE -> fieldType)
    +
    +    tableSchema += (fieldName -> fieldProperties)
    +
    +    lastField = Some(fieldName)
    +    this
    +  }
    +
    +  /**
    +    * Specifies the origin of the previously defined field. The origin field is defined by a
    +    * connector or format.
    --- End diff --
    
    Add that fields are matched by exact name by default.


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r163835834
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptor.scala ---
    @@ -0,0 +1,47 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import org.apache.flink.table.descriptors.DescriptorUtils.connector
    +
    +/**
    +  * Describes a connector to an other system.
    +  *
    +  * @param tpe string identifier for the connector
    +  */
    +abstract class ConnectorDescriptor(private val tpe: String) extends Descriptor {
    --- End diff --
    
    My goal was to keep the descriptor as lightweight as possible. They only describe but do not validate. If we would add validation logic here, we would also need to add the same logic somewhere else in the stack (i.e. table factories) if we parse the properties from a file.


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r163067483
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala ---
    @@ -0,0 +1,213 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.typeutils
    +
    +import java.io.Serializable
    +
    +import org.apache.commons.codec.binary.Base64
    +import org.apache.commons.lang3.StringEscapeUtils
    +import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
    +import org.apache.flink.api.common.typeutils.CompositeType
    +import org.apache.flink.api.java.typeutils._
    +import org.apache.flink.table.api.{TableException, Types, ValidationException}
    +import org.apache.flink.table.descriptors.NormalizedProperties.normalizeTypeInfo
    +import org.apache.flink.util.InstantiationUtil
    +
    +import _root_.scala.language.implicitConversions
    +import _root_.scala.util.parsing.combinator.{JavaTokenParsers, PackratParsers}
    +
    +/**
    +  * Utilities to convert [[org.apache.flink.api.common.typeinfo.TypeInformation]] into a
    +  * string representation and back.
    +  */
    +object TypeStringUtils extends JavaTokenParsers with PackratParsers {
    --- End diff --
    
    Some examples about the supported syntax would be good. 
    Would also be good to add these examples to the method docs that accept type strings.


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r163866463
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala ---
    @@ -0,0 +1,53 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.sources
    +
    +import java.util
    +
    +/**
    +  * A factory to create a [[TableSource]]. This factory is used with Java's Service Provider
    +  * Interfaces (SPI) for discovering. A factory is called with a set of normalized properties that
    +  * describe the desired table source. The factory allows for matching to the given set of
    +  * properties and creating a configured [[TableSource]] accordingly.
    +  *
    +  * Classes that implement this interface need to be added to the
    +  * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' file of a JAR file in
    --- End diff --
    
    No, any JAR file can be used. We use similar functionality for the Flink file systems. For example, we simply need to add a file to 
     `META_INF/services/org.apache.flink.table.sources.TableSourceFactory` into the Kafka connector jar. The Java Service Provider Interfaces do the rest for us.


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r162999931
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala ---
    @@ -0,0 +1,66 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import org.apache.flink.table.descriptors.DescriptorUtils.statistics
    +import org.apache.flink.table.plan.stats.TableStats
    +
    +import scala.collection.JavaConverters._
    +
    +/**
    +  * Common class for all descriptors describing a table source.
    +  */
    +abstract class TableSourceDescriptor extends Descriptor {
    +
    +  protected var schemaDescriptor: Option[Schema] = None
    +  protected var connectorDescriptor: Option[ConnectorDescriptor] = None
    +  protected var encodingDescriptor: Option[EncodingDescriptor] = None
    +  protected var proctimeDescriptor: Option[Proctime] = None
    +  protected var rowtimeDescriptor: Option[Rowtime] = None
    +  protected var statisticsDescriptor: Option[Statistics] = None
    +  protected var metaDescriptor: Option[Metadata] = None
    +
    --- End diff --
    
    We might need another descriptor for mapping fields of the encoding (or connector) to fields in the table schema. This can be used to rename or select fields from the encoding to the table schema. This would be the configuration for the `DefinedFieldMapping` interface.


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r162994182
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptor.scala ---
    @@ -0,0 +1,47 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import org.apache.flink.table.descriptors.DescriptorUtils.connector
    +
    +/**
    +  * Describes a connector to an other system.
    +  *
    +  * @param tpe string identifier for the connector
    +  */
    +abstract class ConnectorDescriptor(private val tpe: String) extends Descriptor {
    --- End diff --
    
    Should a `ConnectorDescriptor` know whether it requires an encoding? For example a file descriptor needs an encoding but a JDBC connector doesn't.
    
    This property would then be used to validate the configuration


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r163012548
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala ---
    @@ -0,0 +1,95 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import org.apache.flink.table.api.{StreamTableEnvironment, Table, TableException}
    +import org.apache.flink.table.sources.{StreamTableSource, TableSource, TableSourceFactoryService}
    +
    +/**
    +  * Descriptor for specifying a table source in a streaming environment.
    +  */
    +class StreamTableSourceDescriptor(
    +    tableEnv: StreamTableEnvironment,
    +    schema: Schema)
    +  extends TableSourceDescriptor {
    +
    +  schemaDescriptor = Some(schema)
    +
    +  /**
    +    * Searches for the specified table source, configures it accordingly, and returns it.
    +    */
    +  def toTableSource: TableSource[_] = {
    +    val source = TableSourceFactoryService.findTableSourceFactory(this)
    +    source match {
    +      case _: StreamTableSource[_] => source
    +      case _ => throw new TableException(
    +        s"Found table source '${source.getClass.getCanonicalName}' is not applicable " +
    +          s"in a streaming environment.")
    +    }
    +  }
    +
    +  /**
    +    * Searches for the specified table source, configures it accordingly, and returns it as a table.
    +    */
    +  def toTable: Table = {
    +    tableEnv.fromTableSource(toTableSource)
    +  }
    +
    +  /**
    +    * Searches for the specified table source, configures it accordingly, and registers it as
    +    * a table under the given name.
    +    *
    +    * @param name table name to be registered in the table environment
    +    */
    +  def register(name: String): Unit = {
    +    tableEnv.registerTableSource(name, toTableSource)
    +  }
    +
    +  /**
    +    * Specifies an connector for reading data from a connector.
    +    */
    +  def withConnector(connector: ConnectorDescriptor): StreamTableSourceDescriptor = {
    +    connectorDescriptor = Some(connector)
    --- End diff --
    
    check if an encoding was added that the connector does not need?


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r164169928
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala ---
    @@ -0,0 +1,53 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.sources
    +
    +import java.util
    +
    +/**
    +  * A factory to create a [[TableSource]]. This factory is used with Java's Service Provider
    +  * Interfaces (SPI) for discovering. A factory is called with a set of normalized properties that
    +  * describe the desired table source. The factory allows for matching to the given set of
    +  * properties and creating a configured [[TableSource]] accordingly.
    +  *
    +  * Classes that implement this interface need to be added to the
    +  * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' file of a JAR file in
    +  * the current classpath to be found.
    +  */
    +trait TableSourceFactory[T] {
    --- End diff --
    
    We might want to add a method that exposes all properties of the connector and format that the factory supports. 


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r163847018
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JSON.scala ---
    @@ -0,0 +1,140 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import java.util
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.table.api.TableSchema
    +
    +import scala.collection.mutable
    +import scala.collection.JavaConverters._
    +
    +/**
    +  * Encoding descriptor for JSON.
    +  */
    +class JSON extends EncodingDescriptor("json") {
    +
    +  private val encodingSchema: mutable.LinkedHashMap[String, String] =
    +      mutable.LinkedHashMap[String, String]()
    +  private var fieldMapping: Option[util.Map[String, String]] = None
    +  private var failOnMissingField: Option[Boolean] = None
    +
    +  /**
    +    * Sets the JSON schema with field names and the types for the JSON-encoded input.
    +    * The JSON schema must not contain nested fields.
    +    *
    +    * This method overwrites existing fields added with [[field()]].
    +    *
    +    * @param schema the table schema
    +    */
    +  def schema(schema: TableSchema): JSON = {
    +    this.encodingSchema.clear()
    +    NormalizedProperties.normalizeTableSchema(schema).foreach {
    +      case (n, t) => field(n, t)
    +    }
    +    this
    +  }
    +
    +  /**
    +    * Adds a JSON field with the field name and the type information for the JSON-encoding.
    +    * This method can be called multiple times. The call order of this method defines
    +    * also the order of the fields in the JSON-encoding.
    +    *
    +    * @param fieldName the field name
    +    * @param fieldType the type information of the field
    +    */
    +  def field(fieldName: String, fieldType: TypeInformation[_]): JSON = {
    +    field(fieldName, NormalizedProperties.normalizeTypeInfo(fieldType))
    +    this
    +  }
    +
    +  /**
    +    * Adds a JSON field with the field name and the type string for the JSON-encoding.
    +    * This method can be called multiple times. The call order of this method defines
    +    * also the order of the fields in the JSON-encoding.
    +    *
    +    * @param fieldName the field name
    +    * @param fieldType the type string of the field
    +    */
    +  def field(fieldName: String, fieldType: String): JSON = {
    +    if (encodingSchema.contains(fieldName)) {
    +      throw new IllegalArgumentException(s"Duplicate field name $fieldName.")
    +    }
    +    encodingSchema += (fieldName -> fieldType)
    +    this
    +  }
    +
    +  /**
    +    * Sets a mapping from schema fields to fields of the JSON schema.
    +    *
    +    * A field mapping is required if the fields of produced tables should be named different than
    +    * the fields of the JSON records.
    +    * The key of the provided Map refers to the field of the table schema,
    +    * the value to the field in the JSON schema.
    +    *
    +    * @param tableToJsonMapping A mapping from table schema fields to JSON schema fields.
    +    * @return The builder.
    +    */
    +  def tableToJsonMapping(tableToJsonMapping: util.Map[String, String]): JSON = {
    +    this.fieldMapping = Some(tableToJsonMapping)
    +    this
    +  }
    +
    +  /**
    +    * Sets flag whether to fail if a field is missing or not.
    +    *
    +    * @param failOnMissingField If set to true, the operation fails if there is a missing field.
    +    *                           If set to false, a missing field is set to null.
    +    * @return The builder.
    +    */
    +  def failOnMissingField(failOnMissingField: Boolean): JSON = {
    +    this.failOnMissingField = Some(failOnMissingField)
    +    this
    +  }
    +
    +  /**
    +    * Internal method for encoding properties conversion.
    +    */
    +  override protected def addEncodingProperties(properties: NormalizedProperties): Unit = {
    +    properties.putIndexedFixedProperties(
    --- End diff --
    
    After an offline discussion, this approach does not seem to work. The best solution for this is to use the JSON schema standard. We need to implement the parsing logic in a later step.


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r164122214
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/NormalizedProperties.scala ---
    @@ -0,0 +1,328 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import java.io.Serializable
    +import java.lang.{Boolean => JBoolean, Double => JDouble, Integer => JInt, Long => JLong}
    +
    +import org.apache.commons.codec.binary.Base64
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.table.api.{TableSchema, ValidationException}
    +import org.apache.flink.table.descriptors.DescriptorUtils._
    +import org.apache.flink.table.descriptors.NormalizedProperties.normalizeTableSchema
    +import org.apache.flink.table.plan.stats.ColumnStats
    +import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp, TimestampExtractor}
    +import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
    +import org.apache.flink.table.typeutils.TypeStringUtils
    +import org.apache.flink.util.InstantiationUtil
    +import org.apache.flink.util.Preconditions.checkNotNull
    +
    +import scala.collection.mutable
    +
    +/**
    +  * Utility class for having a unified string-based representation of Table API related classes
    +  * such as [[TableSchema]], [[TypeInformation]], [[WatermarkStrategy]], etc.
    +  */
    +class NormalizedProperties(
    --- End diff --
    
    Rename to `TableSourceProperties`? `NormalizedProperties` is quite generic


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r163071426
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala ---
    @@ -0,0 +1,213 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.typeutils
    +
    +import java.io.Serializable
    +
    +import org.apache.commons.codec.binary.Base64
    +import org.apache.commons.lang3.StringEscapeUtils
    +import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
    +import org.apache.flink.api.common.typeutils.CompositeType
    +import org.apache.flink.api.java.typeutils._
    +import org.apache.flink.table.api.{TableException, Types, ValidationException}
    +import org.apache.flink.table.descriptors.NormalizedProperties.normalizeTypeInfo
    +import org.apache.flink.util.InstantiationUtil
    +
    +import _root_.scala.language.implicitConversions
    +import _root_.scala.util.parsing.combinator.{JavaTokenParsers, PackratParsers}
    +
    +/**
    +  * Utilities to convert [[org.apache.flink.api.common.typeinfo.TypeInformation]] into a
    +  * string representation and back.
    +  */
    +object TypeStringUtils extends JavaTokenParsers with PackratParsers {
    --- End diff --
    
    We need unit tests for the parser.


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r164150350
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala ---
    @@ -69,17 +72,76 @@ class Schema extends Descriptor {
         */
       def field(fieldName: String, fieldType: String): Schema = {
         if (tableSchema.contains(fieldName)) {
    -      throw new IllegalArgumentException(s"Duplicate field name $fieldName.")
    +      throw new ValidationException(s"Duplicate field name $fieldName.")
    +    }
    +
    +    val fieldProperties = mutable.LinkedHashMap[String, String]()
    +    fieldProperties += (DescriptorUtils.TYPE -> fieldType)
    +
    +    tableSchema += (fieldName -> fieldProperties)
    +
    +    lastField = Some(fieldName)
    +    this
    +  }
    +
    +  /**
    +    * Specifies the origin of the previously defined field. The origin field is defined by a
    +    * connector or format.
    +    *
    +    * E.g. field("myString", Types.STRING).from("CSV_MY_STRING")
    +    */
    +  def from(originFieldName: String): Schema = {
    +    lastField match {
    +      case None => throw new ValidationException("No field defined previously. Use field() before.")
    --- End diff --
    
    "previously defined"


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r164168097
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala ---
    @@ -0,0 +1,131 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import org.apache.flink.table.api.Types
    +import org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor, normalizeWatermarkStrategy}
    +import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp, TimestampExtractor}
    +import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
    +
    +import scala.collection.mutable
    +
    +/**
    +  * Rowtime descriptor for describing an event time attribute in the schema.
    +  */
    +class Rowtime extends Descriptor {
    +
    +  private var timestampExtractor: Option[TimestampExtractor] = None
    +  private var watermarkStrategy: Option[WatermarkStrategy] = None
    +
    +  /**
    +    * Sets a built-in timestamp extractor that converts an existing [[Long]] or
    +    * [[Types.SQL_TIMESTAMP]] field into the rowtime attribute.
    +    *
    +    * @param fieldName The field to convert into a rowtime attribute.
    +    */
    +  def timestampFromField(fieldName: String): Rowtime = {
    +    timestampExtractor = Some(new ExistingField(fieldName))
    +    this
    +  }
    +
    +  /**
    +    * Sets a built-in timestamp extractor that converts the assigned timestamp from
    +    * a DataStream API record into the rowtime attribute.
    +    *
    +    * Note: This extractor only works in streaming environments.
    +    */
    +  def timestampFromDataStream(): Rowtime = {
    +    timestampExtractor = Some(new StreamRecordTimestamp)
    +    this
    +  }
    +
    +  /**
    +    * Sets a custom timestamp extractor to be used for the rowtime attribute.
    +    *
    +    * @param extractor The [[TimestampExtractor]] to extract the rowtime attribute
    +    *                  from the physical type.
    +    */
    +  def timestampFromExtractor(extractor: TimestampExtractor): Rowtime = {
    +    timestampExtractor = Some(extractor)
    +    this
    +  }
    +
    +  /**
    +    * Sets a built-in watermark strategy for ascending rowtime attributes.
    +    *
    +    * Emits a watermark of the maximum observed timestamp so far minus 1.
    +    * Rows that have a timestamp equal to the max timestamp are not late.
    +    */
    +  def watermarkPeriodicAscending(): Rowtime = {
    +    watermarkStrategy = Some(new AscendingTimestamps)
    +    this
    +  }
    +
    +  /**
    +    * Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded
    +    * time interval.
    +    *
    +    * Emits watermarks which are the maximum observed timestamp minus the specified delay.
    +    */
    +  def watermarkPeriodicBounding(delay: Long): Rowtime = {
    --- End diff --
    
    `periodicBoundedOOOWatermarks()`


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r163840224
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JSON.scala ---
    @@ -0,0 +1,140 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import java.util
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.table.api.TableSchema
    +
    +import scala.collection.mutable
    +import scala.collection.JavaConverters._
    +
    +/**
    +  * Encoding descriptor for JSON.
    +  */
    +class JSON extends EncodingDescriptor("json") {
    --- End diff --
    
    I think relying on a standard is a good idea. We should add this as a follow-up issue. It is not trivial to add the JSON schema because we have to decide how we treat e.g. the type `number` or `"type": "string", "format": "date"` or enums (http://json-schema.org/example2.html).


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r163061338
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala ---
    @@ -0,0 +1,95 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.table.api.TableSchema
    +
    +import scala.collection.mutable
    +
    +/**
    +  * Describes a schema of a table.
    +  */
    +class Schema extends Descriptor {
    +
    +  private val tableSchema: mutable.LinkedHashMap[String, String] =
    +      mutable.LinkedHashMap[String, String]()
    +
    +  /**
    +    * Sets the schema with field names and the types. Required.
    +    *
    +    * This method overwrites existing fields added with [[field()]].
    +    *
    +    * @param schema the table schema
    +    */
    +  def schema(schema: TableSchema): Schema = {
    --- End diff --
    
    add a method `def schema(schema: String): Schema` that parses the schema string?


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r163071117
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/DescriptorsTest.scala ---
    @@ -0,0 +1,245 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.api
    +
    +import _root_.java.util
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.TypeExtractor
    +import org.apache.flink.table.descriptors._
    +import org.apache.flink.table.plan.stats.{ColumnStats, TableStats}
    +import org.apache.flink.table.utils.TableTestBase
    +import org.junit.Assert.assertEquals
    +import org.junit.Test
    +
    +class DescriptorsTest extends TableTestBase {
    --- End diff --
    
    I would move the tests to a separate class per descriptor. 
    If we add a `validate` method to `Descriptor` this needs to be tested as well.


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r164168496
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala ---
    @@ -0,0 +1,131 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import org.apache.flink.table.api.Types
    +import org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor, normalizeWatermarkStrategy}
    +import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp, TimestampExtractor}
    +import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
    +
    +import scala.collection.mutable
    +
    +/**
    +  * Rowtime descriptor for describing an event time attribute in the schema.
    +  */
    +class Rowtime extends Descriptor {
    +
    +  private var timestampExtractor: Option[TimestampExtractor] = None
    +  private var watermarkStrategy: Option[WatermarkStrategy] = None
    +
    +  /**
    +    * Sets a built-in timestamp extractor that converts an existing [[Long]] or
    +    * [[Types.SQL_TIMESTAMP]] field into the rowtime attribute.
    +    *
    +    * @param fieldName The field to convert into a rowtime attribute.
    +    */
    +  def timestampFromField(fieldName: String): Rowtime = {
    +    timestampExtractor = Some(new ExistingField(fieldName))
    +    this
    +  }
    +
    +  /**
    +    * Sets a built-in timestamp extractor that converts the assigned timestamp from
    +    * a DataStream API record into the rowtime attribute.
    +    *
    +    * Note: This extractor only works in streaming environments.
    +    */
    +  def timestampFromDataStream(): Rowtime = {
    +    timestampExtractor = Some(new StreamRecordTimestamp)
    +    this
    +  }
    +
    +  /**
    +    * Sets a custom timestamp extractor to be used for the rowtime attribute.
    +    *
    +    * @param extractor The [[TimestampExtractor]] to extract the rowtime attribute
    +    *                  from the physical type.
    +    */
    +  def timestampFromExtractor(extractor: TimestampExtractor): Rowtime = {
    +    timestampExtractor = Some(extractor)
    +    this
    +  }
    +
    +  /**
    +    * Sets a built-in watermark strategy for ascending rowtime attributes.
    +    *
    +    * Emits a watermark of the maximum observed timestamp so far minus 1.
    +    * Rows that have a timestamp equal to the max timestamp are not late.
    +    */
    +  def watermarkPeriodicAscending(): Rowtime = {
    +    watermarkStrategy = Some(new AscendingTimestamps)
    +    this
    +  }
    +
    +  /**
    +    * Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded
    +    * time interval.
    +    *
    +    * Emits watermarks which are the maximum observed timestamp minus the specified delay.
    +    */
    +  def watermarkPeriodicBounding(delay: Long): Rowtime = {
    +    watermarkStrategy = Some(new BoundedOutOfOrderTimestamps(delay))
    +    this
    +  }
    +
    +  /**
    +    * Sets a built-in watermark strategy which indicates the watermarks should be preserved from the
    +    * underlying DataStream API.
    +    */
    +  def watermarkFromDataStream(): Rowtime = {
    +    watermarkStrategy = Some(PreserveWatermarks.INSTANCE)
    +    this
    +  }
    +
    +  /**
    +    * Sets a custom watermark strategy to be used for the rowtime attribute.
    +    */
    +  def watermarkFromStrategy(strategy: WatermarkStrategy): Rowtime = {
    --- End diff --
    
    `watermarksFromStrategy()`


---

[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r163899364
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/DescriptorsTest.scala ---
    @@ -0,0 +1,245 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.api
    +
    +import _root_.java.util
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.TypeExtractor
    +import org.apache.flink.table.descriptors._
    +import org.apache.flink.table.plan.stats.{ColumnStats, TableStats}
    +import org.apache.flink.table.utils.TableTestBase
    +import org.junit.Assert.assertEquals
    +import org.junit.Test
    +
    +class DescriptorsTest extends TableTestBase {
    --- End diff --
    
    Let's do it once the validation methods are in place.


---