You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by twalthr <gi...@git.apache.org> on 2017/08/12 12:10:41 UTC

[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

GitHub user twalthr opened a pull request:

    https://github.com/apache/flink/pull/4532

    [FLINK-7337] [table] Refactor internal handling of time indicator attributes and efficiency

    ## What is the purpose of the change
    
    *This PR is an improvement of @fhueske's PR #4488. For an description see #4488. This PR improves the efficiency by not creating objects for every timestamps but Long values and serializing them using the LongSerializer. It also contains code clean-up changes.*
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/twalthr/flink FLINK-7337

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4532.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4532
    
----
commit 03ca69505e14aea5452a1b6e77d942ecc2440de4
Author: Fabian Hueske <fh...@apache.org>
Date:   2017-08-04T00:20:56Z

    [FLINK-7337] [table] Refactor internal handling of time indicator attributes.
    
    - Expand phyiscal Row schema for time indicators.
    - Refactor computation of logical schema of tables to import.
    - Refactor operators to use time attribute in Row instead of StreamRecord timestamp.

commit 876369d41019b3ec5ba824553b31fb2f3b44a18d
Author: Fabian Hueske <fh...@apache.org>
Date:   2017-08-07T21:39:48Z

    Addressed review feedback

commit 54bfed8debbe29784e0de7f07ebb277df68a4eb5
Author: Fabian Hueske <fh...@apache.org>
Date:   2017-08-11T14:43:07Z

    minor improvement

commit b0b24011e7c0444e3e1ebaba810edc06e9c85ad6
Author: twalthr <tw...@apache.org>
Date:   2017-08-12T11:51:42Z

    Efficient handling of rowtime timestamps

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4532#discussion_r132842811
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ---
    @@ -719,33 +715,47 @@ abstract class TableEnvironment(val config: TableConfig) {
           throw new TableException("Field name can not be '*'.")
         }
     
    -    (fieldNames.toArray, fieldIndexes.toArray)
    +    (fieldNames.toArray, fieldIndexes.toArray) // build fails if not converted to array
    --- End diff --
    
    In my local environment, `toArray` also seems to be redundant.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4532#discussion_r132839934
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToJavaTupleMapRunner.scala ---
    @@ -16,32 +16,32 @@
      * limitations under the License.
      */
     
    -package org.apache.flink.table.runtime
    +package org.apache.flink.table.runtime.conversion
     
     import java.lang.{Boolean => JBool}
     
     import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
     import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
     import org.apache.flink.api.java.typeutils.ResultTypeQueryable
     import org.apache.flink.configuration.Configuration
     import org.apache.flink.table.codegen.Compiler
     import org.apache.flink.table.runtime.types.CRow
     import org.apache.flink.types.Row
    -import org.slf4j.LoggerFactory
    -import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
    +import org.slf4j.{Logger, LoggerFactory}
     
     /**
    -  * Convert [[CRow]] to a [[JTuple2]]
    +  * Convert [[CRow]] to a [[JTuple2]].
       */
    -class CRowInputJavaTupleOutputMapRunner(
    +class CRowToJavaTupleMapRunner(
         name: String,
         code: String,
         @transient var returnType: TypeInformation[JTuple2[JBool, Any]])
       extends RichMapFunction[CRow, Any]
               with ResultTypeQueryable[JTuple2[JBool, Any]]
               with Compiler[MapFunction[Row, Any]] {
    --- End diff --
    
    indent


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4532#discussion_r132889039
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ---
    @@ -719,33 +715,47 @@ abstract class TableEnvironment(val config: TableConfig) {
           throw new TableException("Field name can not be '*'.")
         }
     
    -    (fieldNames.toArray, fieldIndexes.toArray)
    +    (fieldNames.toArray, fieldIndexes.toArray) // build fails if not converted to array
    --- End diff --
    
    See https://travis-ci.org/apache/flink/jobs/263806347
    It only happens when using Scala 2.10.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4532


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4532#discussion_r132854814
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/OutputRowtimeProcessFunction.scala ---
    @@ -0,0 +1,62 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime
    +
    +import org.apache.calcite.runtime.SqlFunctions
    +import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.streaming.api.operators.TimestampedCollector
    +import org.apache.flink.table.runtime.types.CRow
    +import org.apache.flink.util.Collector
    +
    +/**
    +  * Wraps a ProcessFunction and sets a Timestamp field of a CRow as
    +  * [[org.apache.flink.streaming.runtime.streamrecord.StreamRecord]] timestamp.
    +  */
    +class OutputRowtimeProcessFunction[OUT](
    +    function: MapFunction[CRow, OUT],
    +    rowtimeIdx: Int)
    --- End diff --
    
    in fact the data type is changed to keep the data type unchanged. A time indicator is externally represented and treated as a `TIMESTAMP` field and only internally handled as `LONG`. Therefore, we need to convert it into a `TIMESTAMP` once the result is converted into a `DataStream`.
    
    You are right, that we need to convert all time indicators to `TIMESTAMP` and not only one. This is currently enforced by the exception that you observed. Currently users have to cast all but one time indicator attributes to `TIMESTAMP`. That will also convert them from `long` to `Timestamp`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4532#discussion_r132841043
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---
    @@ -667,30 +719,62 @@ abstract class StreamTableEnvironment(
         // get CRow plan
         val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)
     
    +    val rowtimeFields = logicalType
    +      .getFieldList.asScala
    +      .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
    +
    +    // convert the input type for the conversion mapper
    +    // the input will be changed in the OutputRowtimeProcessFunction later
    +    val convType = if (rowtimeFields.size > 1) {
    +      throw new TableException(
    --- End diff --
    
    I'm fine with both current approach and use time indicator of left table as default. But I think no exception should be thrown when writing a Table to a TableSink. But currently, they share the same exception code path.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4532#discussion_r132820853
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---
    @@ -667,30 +719,62 @@ abstract class StreamTableEnvironment(
         // get CRow plan
         val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)
     
    +    val rowtimeFields = logicalType
    +      .getFieldList.asScala
    +      .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
    +
    +    // convert the input type for the conversion mapper
    +    // the input will be changed in the OutputRowtimeProcessFunction later
    +    val convType = if (rowtimeFields.size > 1) {
    +      throw new TableException(
    --- End diff --
    
    I got an idea, but not sure if it's applicable. We allow multiple rowtime fields in a stream but only activate one in an operator. Since the timestamps are stored in records, the other inactive rowtime fields can just be taken as common fields. Any changes on the rowtime fields will render them invalid for rowtime use. IMO, there are not too many queries (maybe only over aggregate and join) depending on the rowtime, thus the optimizer may be able to deduce which rowtime field should be activated in an operator. However, some existing logics may be affected by that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4532#discussion_r132819174
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---
    @@ -667,30 +719,62 @@ abstract class StreamTableEnvironment(
         // get CRow plan
         val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)
     
    +    val rowtimeFields = logicalType
    +      .getFieldList.asScala
    +      .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
    +
    +    // convert the input type for the conversion mapper
    +    // the input will be changed in the OutputRowtimeProcessFunction later
    +    val convType = if (rowtimeFields.size > 1) {
    +      throw new TableException(
    --- End diff --
    
    This is a very good question. I don't know how we want to solve the `SELECT *` problem. My initial idea was to just the attribute of the left table as a time indicator. Another idea would be to remove all time indicators and explicitly choose one attribute `SELECT TIME(OrderA.rowtime)`. Still not perfect. We need to discuss this. @fhueske what do you think?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4532#discussion_r132889590
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---
    @@ -667,30 +719,62 @@ abstract class StreamTableEnvironment(
         // get CRow plan
         val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)
     
    +    val rowtimeFields = logicalType
    +      .getFieldList.asScala
    +      .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
    +
    +    // convert the input type for the conversion mapper
    +    // the input will be changed in the OutputRowtimeProcessFunction later
    +    val convType = if (rowtimeFields.size > 1) {
    +      throw new TableException(
    --- End diff --
    
    @wuchong let's rework the restrictions for a `TableSink` in follow up issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4532: [FLINK-7337] [table] Refactor internal handling of time i...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on the issue:

    https://github.com/apache/flink/pull/4532
  
    I'm +1 to merge this. 
    
    I have create two followup issue, we can move the discussion under the JIRA:
    (1) FLINK-7446
    Support to define an existing field as the rowtime field for TableSource
    (2) FLINK-7448
    Keep the data type unchanged when register an existing field as rowtime


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4532#discussion_r132822158
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---
    @@ -667,30 +719,62 @@ abstract class StreamTableEnvironment(
         // get CRow plan
         val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)
     
    +    val rowtimeFields = logicalType
    +      .getFieldList.asScala
    +      .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
    +
    +    // convert the input type for the conversion mapper
    +    // the input will be changed in the OutputRowtimeProcessFunction later
    +    val convType = if (rowtimeFields.size > 1) {
    +      throw new TableException(
    --- End diff --
    
    @xccui You just described exactly what this PR does :D
    We allow multiple rowtimes and store them in the record. Each operator picks what is needed. The exception that you got and the problem that I described, is the final conversion from Table API back to DataStream API. The DataStream API only allows one timestamp.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4532#discussion_r132854238
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---
    @@ -667,30 +719,62 @@ abstract class StreamTableEnvironment(
         // get CRow plan
         val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)
     
    +    val rowtimeFields = logicalType
    +      .getFieldList.asScala
    +      .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
    +
    +    // convert the input type for the conversion mapper
    +    // the input will be changed in the OutputRowtimeProcessFunction later
    +    val convType = if (rowtimeFields.size > 1) {
    +      throw new TableException(
    --- End diff --
    
    I think we should avoid implicit defaults like using the timestamp attribute of the left most table (the left most table might not have a time indicator attribute, join order optimization would change the order of tables) and special cases for queries like `SELECT *`. 
    
    When a `Table` is converted into a `DataStream` it is likely that the resulting stream is further processed by logic that cannot be expressed in SQL / Table API. If a `Table` has multiple timestamp attributes, IMO a user should be forced to make a choice for the `StreamRecord` timestamp, because the semantics of any subsequent time-based operations will depend on that. I see two ways to do that:
    - ensure that only one attribute is a time indicator by casting the others to `TIMESTAMP`
    - let the user specify which field should be used as timestamp as an additional parameter of the `toAppendStream` and `toRetractStream` methods.
    
    We could also do both.
    
    I agree with @wuchong that we do not need this restriction when we emit a `Table` to a `TableSink`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4532#discussion_r132863952
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ProctimeSqlFunction.scala ---
    @@ -22,12 +22,12 @@ import org.apache.calcite.sql.`type`._
     import org.apache.calcite.sql.validate.SqlMonotonicity
     
     /**
    -  * Function that materializes a time attribute to the metadata timestamp. After materialization
    -  * the result can be used in regular arithmetical calculations.
    +  * Function that materializes a processing time attribute.
    +  * After materialization the result can be used in regular arithmetical calculations.
       */
    -object TimeMaterializationSqlFunction
    +object ProctimeSqlFunction
    --- End diff --
    
    Should we move this object to `org.apache.flink.table.functions.sql` package? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4532#discussion_r132819240
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---
    @@ -667,30 +719,62 @@ abstract class StreamTableEnvironment(
         // get CRow plan
         val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)
     
    +    val rowtimeFields = logicalType
    +      .getFieldList.asScala
    +      .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
    +
    +    // convert the input type for the conversion mapper
    +    // the input will be changed in the OutputRowtimeProcessFunction later
    +    val convType = if (rowtimeFields.size > 1) {
    +      throw new TableException(
    --- End diff --
    
    Maybe we should treat the `SELECT *` as a special case with no following time indicator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4532#discussion_r132840719
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ---
    @@ -719,33 +715,47 @@ abstract class TableEnvironment(val config: TableConfig) {
           throw new TableException("Field name can not be '*'.")
         }
     
    -    (fieldNames.toArray, fieldIndexes.toArray)
    +    (fieldNames.toArray, fieldIndexes.toArray) // build fails if not converted to array
    --- End diff --
    
    It builds successfully when I remove the `toArray` in my local environment. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4532: [FLINK-7337] [table] Refactor internal handling of time i...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/4532
  
    Merging...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4532#discussion_r132833668
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/OutputRowtimeProcessFunction.scala ---
    @@ -0,0 +1,62 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime
    +
    +import org.apache.calcite.runtime.SqlFunctions
    +import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.streaming.api.operators.TimestampedCollector
    +import org.apache.flink.table.runtime.types.CRow
    +import org.apache.flink.util.Collector
    +
    +/**
    +  * Wraps a ProcessFunction and sets a Timestamp field of a CRow as
    +  * [[org.apache.flink.streaming.runtime.streamrecord.StreamRecord]] timestamp.
    +  */
    +class OutputRowtimeProcessFunction[OUT](
    +    function: MapFunction[CRow, OUT],
    +    rowtimeIdx: Int)
    --- End diff --
    
    It seems that this function only changes the data type of the rowtime field from Long to Timestamp. Shall we consider making the `rowtimeIdx` an array? Besides, as @wuchong suggested, I also think a query should keep the data type unchanged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4532#discussion_r132817252
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---
    @@ -667,30 +719,62 @@ abstract class StreamTableEnvironment(
         // get CRow plan
         val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)
     
    +    val rowtimeFields = logicalType
    +      .getFieldList.asScala
    +      .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
    +
    +    // convert the input type for the conversion mapper
    +    // the input will be changed in the OutputRowtimeProcessFunction later
    +    val convType = if (rowtimeFields.size > 1) {
    +      throw new TableException(
    --- End diff --
    
    Thanks for the PR, @fhueske and @twalthr . I tried to rebase my rowtime join codes on this branch, but encountered this exception. The test SQL is `SELECT * FROM OrderA, OrderB WHERE OrderA.productA = OrderB.productB AND OrderB.rtB BETWEEN OrderA.rtA AND OrderA.rtA + INTERVAL '2' SECOND`. What should I do to *cast all other fields to TIMESTAMP*.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4532: [FLINK-7337] [table] Refactor internal handling of time i...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/4532
  
    @wuchong @fhueske I hope I addressed all code related issues. Is it ok to merge this for now? I will create a follow up issue for the Table to DataStream/TableSink conversion case.
    
    > whether we should change the rowtime type when it is an existing field
    I think this is a very special case. But it is just a nice addition to make the user's life easier. We could also remove the replacing feature as a whole to avoid confusion due to the data type conversion. In general, we should get rid of `TIMESTAMP` and work on longs as much as possible. In the near future, we might also extend the API to use Java 8 `java.time.` equivalents.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4532#discussion_r133037759
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---
    @@ -667,30 +719,62 @@ abstract class StreamTableEnvironment(
         // get CRow plan
         val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)
     
    +    val rowtimeFields = logicalType
    +      .getFieldList.asScala
    +      .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
    +
    +    // convert the input type for the conversion mapper
    +    // the input will be changed in the OutputRowtimeProcessFunction later
    +    val convType = if (rowtimeFields.size > 1) {
    +      throw new TableException(
    --- End diff --
    
    What would happen if the left most table does not have a time attribute (or if it is projected out)? I just think that the semantics of the `StreamRecord` timestamps are too important to have an implicit behavior that is hard to explain and reason about for users. IMO, an exception that asks for explicit user input is the better choice compared to a behavior that depends on non-obvious query characteristics and is hard to predict.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4532#discussion_r132834552
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---
    @@ -667,30 +719,62 @@ abstract class StreamTableEnvironment(
         // get CRow plan
         val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)
     
    +    val rowtimeFields = logicalType
    +      .getFieldList.asScala
    +      .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
    +
    +    // convert the input type for the conversion mapper
    +    // the input will be changed in the OutputRowtimeProcessFunction later
    +    val convType = if (rowtimeFields.size > 1) {
    +      throw new TableException(
    +        s"Found more than one rowtime field: [${rowtimeFields.map(_.getName).mkString(", ")}] in " +
    +          s"the table that should be converted to a DataStream.\n" +
    +          s"Please select the rowtime field that should be used as event-time timestamp for the " +
    +          s"DataStream by casting all other fields to TIMESTAMP.")
    +    } else if (rowtimeFields.size == 1) {
    +      val origRowType = plan.getType.asInstanceOf[CRowTypeInfo].rowType
    +      val convFieldTypes = origRowType.getFieldTypes.map { t =>
    +        if (FlinkTypeFactory.isRowtimeIndicatorType(t)) {
    --- End diff --
    
    .....


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4532#discussion_r132971979
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---
    @@ -667,30 +719,62 @@ abstract class StreamTableEnvironment(
         // get CRow plan
         val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)
     
    +    val rowtimeFields = logicalType
    +      .getFieldList.asScala
    +      .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
    +
    +    // convert the input type for the conversion mapper
    +    // the input will be changed in the OutputRowtimeProcessFunction later
    +    val convType = if (rowtimeFields.size > 1) {
    +      throw new TableException(
    --- End diff --
    
    @fhueske With left most table I mean the first time indicator in the select statement (from  left). I think even join reordering does not change the column ordering. I agree that at least `TableSink`s should do deal with it implicitly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---