You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by twalthr <gi...@git.apache.org> on 2018/03/08 12:15:51 UTC

[GitHub] flink pull request #5662: [FLINK-8854] [table] Fix schema mapping with time ...

GitHub user twalthr opened a pull request:

    https://github.com/apache/flink/pull/5662

    [FLINK-8854] [table] Fix schema mapping with time attributes

    ## What is the purpose of the change
    
    This PR fixes the invalid field mapping and improves the mapping of time attributes in general.
    
    ## Brief change log
    
    - `SchemaValidator.deriveFieldMapping()` and `SchemaValidator.deriveFormatFields()`
    
    
    ## Verifying this change
    
    - Existing tests extended
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): no
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
      - The serializers: no
      - The runtime per-record code paths (performance sensitive): no
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
      - The S3 file system connector: no
    
    ## Documentation
    
      - Does this pull request introduce a new feature? no
      - If yes, how is the feature documented? JavaDocs


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/twalthr/flink FLINK-8854

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5662.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5662
    
----
commit 45c1ca57b65004fa973b663e441848ab228ade3e
Author: Timo Walther <tw...@...>
Date:   2018-03-08T10:51:38Z

    [FLINK-8854] [table] Fix schema mapping with time attributes

----


---

[GitHub] flink pull request #5662: [FLINK-8854] [table] Fix schema mapping with time ...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5662#discussion_r173879267
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java ---
    @@ -89,9 +94,10 @@ private void testTableSource(FormatDescriptor format) {
     		// construct table source using a builder
     
     		final Map<String, String> tableJsonMapping = new HashMap<>();
    +		tableJsonMapping.put("name", "name");
    --- End diff --
    
    Well, according to the current implementation, you are right. But I still feel uncomfortable about that since we actually mix the physical schema (format schema) and the logical schema (table schema) into the same map. Do you think it's necessary to make some changes here?


---

[GitHub] flink pull request #5662: [FLINK-8854] [table] Fix schema mapping with time ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5662#discussion_r173465435
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala ---
    @@ -198,14 +205,20 @@ object SchemaValidator {
           val isProctime = properties
             .getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME")
             .orElse(false)
    -      val isRowtime = properties
    -        .containsKey(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE")
    +      val tsType = s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE"
    +      val isRowtime = properties.containsKey(tsType)
           if (!isProctime && !isRowtime) {
             // check for a aliasing
             val fieldName = properties.getOptionalString(s"$SCHEMA.$i.$SCHEMA_FROM")
               .orElse(n)
             builder.field(fieldName, t)
           }
    +      // only use the rowtime attribute if it references a field
    +      else if (isRowtime &&
    +          properties.getString(tsType) == ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD) {
    --- End diff --
    
    You are right, we should declare `ExistingField` `final`. In the custom extractor case, a user has to supply the format manually. Maybe we will need an explanation logic in the future such that a user can see how the derived format looks like and if it makes sense to declare it explicitly.


---

[GitHub] flink pull request #5662: [FLINK-8854] [table] Fix schema mapping with time ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5662#discussion_r173881141
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java ---
    @@ -89,9 +94,10 @@ private void testTableSource(FormatDescriptor format) {
     		// construct table source using a builder
     
     		final Map<String, String> tableJsonMapping = new HashMap<>();
    +		tableJsonMapping.put("name", "name");
    --- End diff --
    
    @fhueske what do you think about this whole mapping business?


---

[GitHub] flink pull request #5662: [FLINK-8854] [table] Fix schema mapping with time ...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5662#discussion_r173784883
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala ---
    @@ -148,6 +148,13 @@ object SchemaValidator {
     
         val schema = properties.getTableSchema(SCHEMA)
     
    +    // add all source fields first because rowtime might reference one of them
    +    toScala(sourceSchema).map(_.getColumnNames).foreach { names =>
    --- End diff --
    
    Hi @twalthr, can we check the used `TimestampExtractor` here? Specifically, if it's an `ExistingField`, we only included the target fields; if it's a `StreamRecordTimestamp` we don't include extra fields; and only if it's a custom extractor we include all the source fields.


---

[GitHub] flink pull request #5662: [FLINK-8854] [table] Fix schema mapping with time ...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5662#discussion_r173857218
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala ---
    @@ -148,6 +148,13 @@ object SchemaValidator {
     
         val schema = properties.getTableSchema(SCHEMA)
     
    +    // add all source fields first because rowtime might reference one of them
    +    toScala(sourceSchema).map(_.getColumnNames).foreach { names =>
    --- End diff --
    
    I think we should first remove the added source fields before adding the explicit mappings with the following snippet. 
    ```
    // add explicit mapping
    case Some(source) =>
        // should add mapping.remove(source)
        mapping.put(name, source)
    ```


---

[GitHub] flink pull request #5662: [FLINK-8854] [table] Fix schema mapping with time ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5662#discussion_r173786510
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala ---
    @@ -148,6 +148,13 @@ object SchemaValidator {
     
         val schema = properties.getTableSchema(SCHEMA)
     
    +    // add all source fields first because rowtime might reference one of them
    +    toScala(sourceSchema).map(_.getColumnNames).foreach { names =>
    --- End diff --
    
    Yes @xccui, we could change the logic like this. Do you have cases where we need such behavior?


---

[GitHub] flink pull request #5662: [FLINK-8854] [table] Fix schema mapping with time ...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5662#discussion_r173860901
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java ---
    @@ -89,9 +94,10 @@ private void testTableSource(FormatDescriptor format) {
     		// construct table source using a builder
     
     		final Map<String, String> tableJsonMapping = new HashMap<>();
    +		tableJsonMapping.put("name", "name");
    --- End diff --
    
    This "name" to "name" mapping should not exist since we've already explicitly defined the "fruit-name" to "name" mapping.


---

[GitHub] flink issue #5662: [FLINK-8854] [table] Fix schema mapping with time attribu...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on the issue:

    https://github.com/apache/flink/pull/5662
  
    Thanks for the explanation, @twalthr! I'll update the PR and resolve the conflicts caused.


---

[GitHub] flink issue #5662: [FLINK-8854] [table] Fix schema mapping with time attribu...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/5662
  
    Merging...


---

[GitHub] flink pull request #5662: [FLINK-8854] [table] Fix schema mapping with time ...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5662#discussion_r173455747
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala ---
    @@ -198,14 +205,20 @@ object SchemaValidator {
           val isProctime = properties
             .getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME")
             .orElse(false)
    -      val isRowtime = properties
    -        .containsKey(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE")
    +      val tsType = s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE"
    +      val isRowtime = properties.containsKey(tsType)
           if (!isProctime && !isRowtime) {
             // check for a aliasing
             val fieldName = properties.getOptionalString(s"$SCHEMA.$i.$SCHEMA_FROM")
               .orElse(n)
             builder.field(fieldName, t)
           }
    +      // only use the rowtime attribute if it references a field
    +      else if (isRowtime &&
    +          properties.getString(tsType) == ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD) {
    --- End diff --
    
    What if the user uses the custom extractor to define his/her own `ExistingField` extractor that references a field?


---

[GitHub] flink pull request #5662: [FLINK-8854] [table] Fix schema mapping with time ...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5662#discussion_r173791129
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala ---
    @@ -148,6 +148,13 @@ object SchemaValidator {
     
         val schema = properties.getTableSchema(SCHEMA)
     
    +    // add all source fields first because rowtime might reference one of them
    +    toScala(sourceSchema).map(_.getColumnNames).foreach { names =>
    --- End diff --
    
    Not really. I was justing refactoring #5610. For convenience, I used the existing class `org.apache.flink.formats.avro.generated.User` in a test case, but it gets so many fields to be mapped. 😄 


---

[GitHub] flink pull request #5662: [FLINK-8854] [table] Fix schema mapping with time ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5662#discussion_r173863375
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java ---
    @@ -89,9 +94,10 @@ private void testTableSource(FormatDescriptor format) {
     		// construct table source using a builder
     
     		final Map<String, String> tableJsonMapping = new HashMap<>();
    +		tableJsonMapping.put("name", "name");
    --- End diff --
    
    The question is what should a rowtime attribute field (or a custom extractor) reference? The input or the current schema? I think it should reference the input thus all fields (even the renamed ones) need to be present in the mapping.


---

[GitHub] flink pull request #5662: [FLINK-8854] [table] Fix schema mapping with time ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5662


---

[GitHub] flink pull request #5662: [FLINK-8854] [table] Fix schema mapping with time ...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5662#discussion_r173888555
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java ---
    @@ -89,9 +94,10 @@ private void testTableSource(FormatDescriptor format) {
     		// construct table source using a builder
     
     		final Map<String, String> tableJsonMapping = new HashMap<>();
    +		tableJsonMapping.put("name", "name");
    --- End diff --
    
    ```
    physical schema ==mapping=> "intermediate schema" ==timestamp extraction and projection=> logical schema
    ```
    Maybe we should consider eliminating the "intermedia schema" in the future?


---

[GitHub] flink issue #5662: [FLINK-8854] [table] Fix schema mapping with time attribu...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/5662
  
    Thanks for the comments @xccui. It's never to late for feedback. Sorry, maybe I merged this too quickly. We still need to call `builder.forJsonSchema()` if the schema contains a `proctime` attribute. The most common use case will be to extend the format by time attributes. With your approach the format would contain an additional timestamp that is definitely not part of the format schema.


---