You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by fhueske <gi...@git.apache.org> on 2017/08/07 12:28:45 UTC

[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

GitHub user fhueske opened a pull request:

    https://github.com/apache/flink/pull/4488

    [FLINK-7337] [table] Refactor internal handling of time indicator attributes.

    ## What is the purpose of the change
    
    Handle time indicators attributes as physical fields in `Row`.
    - support for multiple event-time time attributes (required for proper join handling)
    - no distinction of logical and physical schema
    
    ## Brief change log
    
    - Expand phyiscal Row schema for time indicators.
    - Refactor computation of logical schema of tables to import.
    - `StreamRecord` timestamps are moved into `Row` during initial conversion
    - Refactor operators to use time attribute in Row instead of StreamRecord timestamp.
    - timestamps are copied into `StreamRecord` when `Table` is converted into `DataStream`.
    - Drive-by fix for NPE in `generateInputFieldUnboxing`
    
    ## Verifying this change
    
    - Changes are mostly internal. 
    - No new features or public APIs have been added. 
    - All existing tests pass.
    - Tests have been added to verify that time indicators are copied when `Table` is converted into `DataStream`.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): **no**
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no**
      - The serializers: **no**
      - The runtime per-record code paths (performance sensitive): **yes**
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
    
    ## Documentation
    
      - Does this pull request introduce a new feature? **no**
      - If yes, how is the feature documented? **na**
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/fhueske/flink tableTimeIndi

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4488.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4488
    
----
commit accf80587b5a7455fb913b5860f973b3a738765a
Author: Fabian Hueske <fh...@apache.org>
Date:   2017-08-04T00:20:56Z

    [FLINK-7337] [table] Refactor internal handling of time indicator attributes.
    
    - Expand phyiscal Row schema for time indicators.
    - Refactor computation of logical schema of tables to import.
    - Refactor operators to use time attribute in Row instead of StreamRecord timestamp.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4488#discussion_r132819009
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala ---
    @@ -19,7 +19,9 @@
     package org.apache.flink.table.runtime
     
     import java.lang.{Boolean => JBool}
    +import java.sql.Timestamp
     
    +import org.apache.calcite.runtime.SqlFunctions
    --- End diff --
    
    remove useless imports


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4488#discussion_r131761767
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputProcessRunner.scala ---
    @@ -18,42 +18,54 @@
     
     package org.apache.flink.table.runtime
     
    -import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
     import org.apache.flink.api.common.typeinfo.TypeInformation
     import org.apache.flink.api.java.typeutils.ResultTypeQueryable
     import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.streaming.api.operators.TimestampedCollector
     import org.apache.flink.table.codegen.Compiler
     import org.apache.flink.table.runtime.types.CRow
     import org.apache.flink.types.Row
    +import org.apache.flink.util.Collector
     import org.slf4j.LoggerFactory
     
     /**
    -  * MapRunner with [[CRow]] output.
    +  * ProcessRunner with [[CRow]] output.
       */
    -class CRowOutputMapRunner(
    +class CRowOutputProcessRunner(
         name: String,
         code: String,
         @transient var returnType: TypeInformation[CRow])
    -  extends RichMapFunction[Any, CRow]
    +  extends ProcessFunction[Any, CRow]
       with ResultTypeQueryable[CRow]
    -  with Compiler[MapFunction[Any, Row]] {
    +  with Compiler[ProcessFunction[Any, Row]] {
     
       val LOG = LoggerFactory.getLogger(this.getClass)
     
    -  private var function: MapFunction[Any, Row] = _
    -  private var outCRow: CRow = _
    +  private var function: ProcessFunction[Any, Row] = _
    +  private var cRowWrapper: CRowWrappingCollector = _
     
       override def open(parameters: Configuration): Unit = {
         LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code")
         val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
         LOG.debug("Instantiating MapFunction.")
         function = clazz.newInstance()
    -    outCRow = new CRow(null, true)
    +
    +    this.cRowWrapper = new CRowWrappingCollector()
    +    this.cRowWrapper.setChange(true)
       }
     
    -  override def map(in: Any): CRow = {
    -    outCRow.row = function.map(in)
    -    outCRow
    +  override def processElement(
    +      in: Any,
    +      ctx: ProcessFunction[Any, CRow]#Context,
    +      out: Collector[CRow]): Unit = {
    +
    +    // remove timestamp from stream record
    +    val tc = out.asInstanceOf[TimestampedCollector[_]]
    --- End diff --
    
    It is not strictly required but reduces the serialization overhead by one Long value.
    I added this to most functions that introduce a timestamp (ProcessFunction) but would also be OK to remove it. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4488: [FLINK-7337] [table] Refactor internal handling of time i...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/4488
  
    I'm fine with a followup issue. +1 to merge this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4488#discussion_r131704170
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---
    @@ -502,6 +500,68 @@ abstract class StreamTableEnvironment(
       }
     
       /**
    +    * Injects markers for time indicator fields into the field indexes.
    +    * A rowtime indicator is represented as -1, a proctime indicator as -2.
    +    *
    +    * @param fieldIndexes The field indexes into which the time indicators markers are injected.
    +    * @param rowtime An optional rowtime indicator
    +    * @param proctime An optional proctime indicator
    +    * @return An adjusted array of field indexes.
    +    */
    +  private def adjustFieldIndexes(
    +    fieldIndexes: Array[Int],
    +    rowtime: Option[(Int, String)],
    +    proctime: Option[(Int, String)]): Array[Int] = {
    +
    +    // inject rowtime field
    +    val withRowtime = if (rowtime.isDefined) {
    +      fieldIndexes.patch(rowtime.get._1, Seq(-1), 0) // -1 indicates rowtime
    +    } else {
    +      fieldIndexes
    +    }
    +
    +    // inject proctime field
    +    val withProctime = if (proctime.isDefined) {
    +      withRowtime.patch(proctime.get._1, Seq(-2), 0) // -2 indicates proctime
    +    } else {
    +      withRowtime
    +    }
    +
    +    withProctime
    +  }
    +
    +  /**
    +    * Injects names of time indicator fields into the list of field names.
    +    *
    +    * @param fieldNames The array of field names into which the time indicator field names are
    +    *                   injected.
    +    * @param rowtime An optional rowtime indicator
    +    * @param proctime An optional proctime indicator
    +    * @return An adjusted array of field names.
    +    */
    +  private def adjustFieldNames(
    +    fieldNames: Array[String],
    +    rowtime: Option[(Int, String)],
    +    proctime: Option[(Int, String)]): Array[String] = {
    +
    +    // inject rowtime field
    +    val withRowtime = if (rowtime.isDefined) {
    +      fieldNames.patch(rowtime.get._1, Seq(rowtime.get._2), 0)
    +    } else {
    +      fieldNames
    +    }
    +
    +    // inject proctime field
    +    val withProctime = if (proctime.isDefined) {
    --- End diff --
    
    we could use pattern matching here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4488#discussion_r131708789
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -245,14 +244,18 @@ abstract class CodeGenerator(
           returnType: TypeInformation[_ <: Any],
           resultFieldNames: Seq[String])
         : GeneratedExpression = {
    -    val input1AccessExprs = input1Mapping.map { idx =>
    -      generateInputAccess(input1, input1Term, idx)
    +    val input1AccessExprs = input1Mapping.map {
    +      case -1 => generateStreamRecordTimestampAcccess()
    +      case -2 => generateNullLiteral(TimeIndicatorTypeInfo.PROCTIME_INDICATOR)
    +      case idx => generateInputAccess(input1, input1Term, idx)
         }
     
         val input2AccessExprs = input2 match {
           case Some(ti) =>
    -        input2Mapping.map { idx =>
    -          generateInputAccess(ti, input2Term, idx)
    +        input2Mapping.map {
    --- End diff --
    
    are there any use case where these lines are needed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4488#discussion_r132974743
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala ---
    @@ -46,29 +47,29 @@ class StreamTableSourceScan(
         val fieldNames = TableEnvironment.getFieldNames(tableSource).toList
         val fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList
     
    -    val fieldCnt = fieldNames.length
    +    val fields = fieldNames.zip(fieldTypes)
     
    -    val rowtime = tableSource match {
    +    val withRowtime = tableSource match {
           case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute != null =>
    --- End diff --
    
    Can you open an issue for this? We can discuss this after merging this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4488: [FLINK-7337] [table] Refactor internal handling of time i...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/4488
  
    @wuchong could you take a look at #4532 as well? It is based on this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4488#discussion_r132819122
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/WrappingTimestampSetterProcessFunction.scala ---
    @@ -0,0 +1,61 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime
    +
    +import java.sql.Timestamp
    +
    +import org.apache.calcite.runtime.SqlFunctions
    +import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.streaming.api.operators.TimestampedCollector
    +import org.apache.flink.table.runtime.types.CRow
    +import org.apache.flink.util.Collector
    +
    +/**
    +  * Wraps a ProcessFunction and sets a Timestamp field of a CRow as
    +  * [[org.apache.flink.streaming.runtime.streamrecord.StreamRecord]] timestamp.
    +  */
    +class WrappingTimestampSetterProcessFunction[OUT](
    +    function: MapFunction[CRow, OUT],
    +    rowtimeIdx: Int)
    +  extends ProcessFunction[CRow, OUT] {
    +
    +  override def open(parameters: Configuration): Unit = {
    +    super.open(parameters)
    +    function match {
    --- End diff --
    
    We can use `FunctionUtils` instead of match case.  But I'm also fine with match case.
    
    ```scala
    FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
    FunctionUtils.openFunction(function, parameters)
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4488#discussion_r131709842
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -1104,6 +1106,19 @@ abstract class CodeGenerator(
         }
       }
     
    +  private[flink] def generateStreamRecordTimestampAcccess(): GeneratedExpression = {
    --- End diff --
    
    I would call this `generateRowtimeAccess` and locate it next to `generateProctimeTimestamp`. In any case remove the third `c`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4488#discussion_r131724363
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputProcessRunner.scala ---
    @@ -18,42 +18,54 @@
     
     package org.apache.flink.table.runtime
     
    -import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
     import org.apache.flink.api.common.typeinfo.TypeInformation
     import org.apache.flink.api.java.typeutils.ResultTypeQueryable
     import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.streaming.api.operators.TimestampedCollector
     import org.apache.flink.table.codegen.Compiler
     import org.apache.flink.table.runtime.types.CRow
     import org.apache.flink.types.Row
    +import org.apache.flink.util.Collector
     import org.slf4j.LoggerFactory
     
     /**
    -  * MapRunner with [[CRow]] output.
    +  * ProcessRunner with [[CRow]] output.
       */
    -class CRowOutputMapRunner(
    +class CRowOutputProcessRunner(
         name: String,
         code: String,
         @transient var returnType: TypeInformation[CRow])
    -  extends RichMapFunction[Any, CRow]
    +  extends ProcessFunction[Any, CRow]
       with ResultTypeQueryable[CRow]
    -  with Compiler[MapFunction[Any, Row]] {
    +  with Compiler[ProcessFunction[Any, Row]] {
     
       val LOG = LoggerFactory.getLogger(this.getClass)
     
    -  private var function: MapFunction[Any, Row] = _
    -  private var outCRow: CRow = _
    +  private var function: ProcessFunction[Any, Row] = _
    +  private var cRowWrapper: CRowWrappingCollector = _
     
       override def open(parameters: Configuration): Unit = {
         LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code")
         val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
         LOG.debug("Instantiating MapFunction.")
    --- End diff --
    
    ProcessFunction


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4488#discussion_r131705390
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---
    @@ -677,12 +748,43 @@ abstract class StreamTableEnvironment(
     
         val rootParallelism = plan.getParallelism
     
    -    conversion match {
    -      case mapFunction: MapFunction[CRow, A] =>
    -        plan.map(mapFunction)
    -          .returns(tpe)
    -          .name(s"to: ${tpe.getTypeClass.getSimpleName}")
    -          .setParallelism(rootParallelism)
    +    val rowtimeFields = logicalType.getFieldList.asScala
    +      .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
    +
    +    if (rowtimeFields.isEmpty) {
    +      // to rowtime field to set
    +
    +      conversion match {
    +        case mapFunction: MapFunction[CRow, A] =>
    +          plan.map(mapFunction)
    +            .returns(tpe)
    +            .name(s"to: ${tpe.getTypeClass.getSimpleName}")
    +            .setParallelism(rootParallelism)
    +      }
    +    } else if (rowtimeFields.size == 1) {
    +      // set the only rowtime field as event-time timestamp for DataStream
    +
    +      val mapFunction = conversion match {
    +        case mapFunction: MapFunction[CRow, A] => mapFunction
    +        case _ => new MapFunction[CRow, A] {
    +          override def map(cRow: CRow): A = cRow.asInstanceOf[A]
    +        }
    +      }
    +
    +      plan.process(
    +        new WrappingTimestampSetterProcessFunction[A](
    +          mapFunction,
    +          rowtimeFields.head.getIndex))
    +        .returns(tpe)
    +        .name(s"to: ${tpe.getTypeClass.getSimpleName}")
    +        .setParallelism(rootParallelism)
    +
    +    } else {
    +      throw new TableException(
    +        s"Found more than one rowtime field: [${rowtimeFields.map(_.getName).mkString(", ")}] in " +
    +          s"the table that should be converted to a DataStream.\n" +
    +          s"Please select the rowtime field that should be used as event-time timestamp for the " +
    +          s"DataStream by casting all other fields to TIMESTAMP or LONG.")
    --- End diff --
    
    I would just recommend TIMESTAMP. LONG is still not supported.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4488#discussion_r131712146
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -245,14 +244,18 @@ abstract class CodeGenerator(
           returnType: TypeInformation[_ <: Any],
           resultFieldNames: Seq[String])
         : GeneratedExpression = {
    -    val input1AccessExprs = input1Mapping.map { idx =>
    -      generateInputAccess(input1, input1Term, idx)
    +    val input1AccessExprs = input1Mapping.map {
    +      case -1 => generateStreamRecordTimestampAcccess()
    +      case -2 => generateNullLiteral(TimeIndicatorTypeInfo.PROCTIME_INDICATOR)
    --- End diff --
    
    Can we use constants for this special indices?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4488#discussion_r131709106
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -719,12 +722,11 @@ abstract class CodeGenerator(
     
       override def visitCall(call: RexCall): GeneratedExpression = {
         // special case: time materialization
    -    if (call.getOperator == TimeMaterializationSqlFunction) {
    -      return generateRecordTimestamp(
    -        FlinkTypeFactory.isRowtimeIndicatorType(call.getOperands.get(0).getType)
    -      )
    +    if (call.getOperator == ProcTimeMaterializationSqlFunction) {
    +      return generateProcTimestamp()
         }
     
    +
    --- End diff --
    
    remove empty line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4488#discussion_r131723377
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala ---
    @@ -90,3 +95,35 @@ class CRowInputScalaTupleOutputMapRunner(
     
       override def getProducedType: TypeInformation[(Boolean, Any)] = returnType
     }
    +
    +/**
    +  * Wraps a ProcessFunction and sets a Timestamp field of a CRow as
    +  * [[org.apache.flink.streaming.runtime.streamrecord.StreamRecord]] timestamp.
    +  */
    +class WrappingTimestampSetterProcessFunction[OUT](
    --- End diff --
    
    Move this to separate file or rename file.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4488#discussion_r131720018
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala ---
    @@ -148,6 +142,20 @@ class DataStreamGroupWindowAggregate(
             "state size. You may specify a retention time of 0 to not clean up the state.")
         }
     
    +    val timestampedInput = if (isRowtimeAttribute(window.timeAttribute)) {
    +      // copy the window rowtime attribute into the StreamRecord timestamp field
    +      val timeAttribute = window.timeAttribute.asInstanceOf[ResolvedFieldReference].name
    +      val timeIdx = inputSchema.logicalFieldNames.indexOf(timeAttribute)
    --- End diff --
    
    I thought we can get rid of logical/physical distinction?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4488


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4488#discussion_r131717969
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala ---
    @@ -937,15 +935,40 @@ object ScalarOperators {
         }
       }
     
    -  def generateConcat(
    -      method: Method,
    -      operands: Seq[GeneratedExpression]): GeneratedExpression = {
    +  def generateConcat(operands: Seq[GeneratedExpression]): GeneratedExpression = {
     
    -    generateCallIfArgsNotNull(false, STRING_TYPE_INFO, operands) {
    -      (terms) =>s"${qualifyMethod(method)}(${terms.mkString(", ")})"
    +    generateCallIfArgsNotNull(true, STRING_TYPE_INFO, operands) {
    +      (terms) =>s"${qualifyMethod(BuiltInMethods.CONCAT)}(${terms.mkString(", ")})"
         }
       }
     
    +  def generateConcatWs(operands: Seq[GeneratedExpression]): GeneratedExpression = {
    +
    +    val resultTerm = newName("result")
    +    val nullTerm = newName("isNull")
    +    val defaultValue = primitiveDefaultValue(Types.STRING)
    +
    +    val operatorCode =
    +      s"""
    +        |${operands.map(_.code).mkString("\n")}
    +        |
    +        |String $resultTerm;
    +        |boolean $nullTerm;
    +        |if (${operands.head.nullTerm}) {
    +        |  $nullTerm = true;
    +        |  $resultTerm = $defaultValue;
    +        |} else {
    +        |
    +        |  ${operands.tail.map(o => s"if (${o.nullTerm}) ${o.resultTerm} = null;").mkString("\n")}
    --- End diff --
    
    I would not reassign a `resultTerm`. Actually they should be declared `final` for optimization. We should do this in the near future.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4488#discussion_r131743809
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -245,14 +244,18 @@ abstract class CodeGenerator(
           returnType: TypeInformation[_ <: Any],
           resultFieldNames: Seq[String])
         : GeneratedExpression = {
    -    val input1AccessExprs = input1Mapping.map { idx =>
    -      generateInputAccess(input1, input1Term, idx)
    +    val input1AccessExprs = input1Mapping.map {
    +      case -1 => generateStreamRecordTimestampAcccess()
    +      case -2 => generateNullLiteral(TimeIndicatorTypeInfo.PROCTIME_INDICATOR)
    +      case idx => generateInputAccess(input1, input1Term, idx)
         }
     
         val input2AccessExprs = input2 match {
           case Some(ti) =>
    -        input2Mapping.map { idx =>
    -          generateInputAccess(ti, input2Term, idx)
    +        input2Mapping.map {
    --- End diff --
    
    No, should not be used. I'll remove them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4488: [FLINK-7337] [table] Refactor internal handling of time i...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on the issue:

    https://github.com/apache/flink/pull/4488
  
    @twalthr  I'm working on other issues before Thursday, so I would like to have a look at it at Thursday (Beijing). But if you are hurry, I'm fine to merge this first.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4488#discussion_r131706849
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala ---
    @@ -311,19 +323,19 @@ object RelTimeIndicatorConverter {
     
         var needsConversion = false
     
    -    // materialize all remaining time indicators
    +    // materialize remaining proc time indicators
    --- End diff --
    
    remove space


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4488#discussion_r131713099
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -1104,6 +1106,19 @@ abstract class CodeGenerator(
         }
       }
     
    +  private[flink] def generateStreamRecordTimestampAcccess(): GeneratedExpression = {
    +    val resultTerm = newName("result")
    +    val nullTerm = newName("isNull")
    +
    +    val accessCode =
    +      s"""
    +         |long $resultTerm = $contextTerm.timestamp();
    --- End diff --
    
    This could lead to a NPE, I would reintroduce the exception from `generateRecordTimestamp`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4488#discussion_r131718272
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ProcTimeMaterializationSqlFunction.scala ---
    @@ -22,12 +22,12 @@ import org.apache.calcite.sql.`type`._
     import org.apache.calcite.sql.validate.SqlMonotonicity
     
     /**
    -  * Function that materializes a time attribute to the metadata timestamp. After materialization
    -  * the result can be used in regular arithmetical calculations.
    +  * Function that materializes a processing time attribute.
    +  * After materialization the result can be used in regular arithmetical calculations.
       */
    -object TimeMaterializationSqlFunction
    +object ProcTimeMaterializationSqlFunction
    --- End diff --
    
    Should we shorten this to `ProctimeSqlFunction`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4488#discussion_r133104586
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala ---
    @@ -46,29 +47,29 @@ class StreamTableSourceScan(
         val fieldNames = TableEnvironment.getFieldNames(tableSource).toList
         val fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList
     
    -    val fieldCnt = fieldNames.length
    +    val fields = fieldNames.zip(fieldTypes)
     
    -    val rowtime = tableSource match {
    +    val withRowtime = tableSource match {
           case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute != null =>
    --- End diff --
    
    Sure, I have logged it https://issues.apache.org/jira/browse/FLINK-7446


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4488#discussion_r131724317
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputProcessRunner.scala ---
    @@ -18,42 +18,54 @@
     
     package org.apache.flink.table.runtime
     
    -import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
     import org.apache.flink.api.common.typeinfo.TypeInformation
     import org.apache.flink.api.java.typeutils.ResultTypeQueryable
     import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.streaming.api.operators.TimestampedCollector
     import org.apache.flink.table.codegen.Compiler
     import org.apache.flink.table.runtime.types.CRow
     import org.apache.flink.types.Row
    +import org.apache.flink.util.Collector
     import org.slf4j.LoggerFactory
     
     /**
    -  * MapRunner with [[CRow]] output.
    +  * ProcessRunner with [[CRow]] output.
       */
    -class CRowOutputMapRunner(
    +class CRowOutputProcessRunner(
         name: String,
         code: String,
         @transient var returnType: TypeInformation[CRow])
    -  extends RichMapFunction[Any, CRow]
    +  extends ProcessFunction[Any, CRow]
       with ResultTypeQueryable[CRow]
    -  with Compiler[MapFunction[Any, Row]] {
    +  with Compiler[ProcessFunction[Any, Row]] {
     
       val LOG = LoggerFactory.getLogger(this.getClass)
     
    -  private var function: MapFunction[Any, Row] = _
    -  private var outCRow: CRow = _
    +  private var function: ProcessFunction[Any, Row] = _
    +  private var cRowWrapper: CRowWrappingCollector = _
     
       override def open(parameters: Configuration): Unit = {
         LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code")
    --- End diff --
    
    ProcessFunction


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4488#discussion_r131720129
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala ---
    @@ -148,6 +142,20 @@ class DataStreamGroupWindowAggregate(
             "state size. You may specify a retention time of 0 to not clean up the state.")
         }
     
    +    val timestampedInput = if (isRowtimeAttribute(window.timeAttribute)) {
    +      // copy the window rowtime attribute into the StreamRecord timestamp field
    +      val timeAttribute = window.timeAttribute.asInstanceOf[ResolvedFieldReference].name
    +      val timeIdx = inputSchema.logicalFieldNames.indexOf(timeAttribute)
    +
    +      inputDS
    +        .process(
    +          new TimestampSetterProcessFunction(timeIdx,CRowTypeInfo(inputSchema.physicalTypeInfo)))
    --- End diff --
    
    missing space


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4488#discussion_r131707428
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala ---
    @@ -415,7 +406,13 @@ class RexTimeIndicatorMaterializer(
           case _ =>
             updatedCall.getOperands.map { o =>
               if (isTimeIndicatorType(o.getType)) {
    -            rexBuilder.makeCall(TimeMaterializationSqlFunction, o)
    +            if (isRowtimeIndicatorType(o.getType)) {
    --- End diff --
    
    maybe we could move this code to a method, because it appears 3 times


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4488#discussion_r131704681
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---
    @@ -502,6 +500,68 @@ abstract class StreamTableEnvironment(
       }
     
       /**
    +    * Injects markers for time indicator fields into the field indexes.
    +    * A rowtime indicator is represented as -1, a proctime indicator as -2.
    +    *
    +    * @param fieldIndexes The field indexes into which the time indicators markers are injected.
    +    * @param rowtime An optional rowtime indicator
    +    * @param proctime An optional proctime indicator
    +    * @return An adjusted array of field indexes.
    +    */
    +  private def adjustFieldIndexes(
    +    fieldIndexes: Array[Int],
    +    rowtime: Option[(Int, String)],
    +    proctime: Option[(Int, String)]): Array[Int] = {
    +
    +    // inject rowtime field
    +    val withRowtime = if (rowtime.isDefined) {
    --- End diff --
    
    we could use pattern matching here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4488#discussion_r131739402
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala ---
    @@ -172,45 +172,20 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
         *
         * @param fieldNames field names
         * @param fieldTypes field types, every element is Flink's [[TypeInformation]]
    -    * @param rowtime optional system field to indicate event-time; the index determines the index
    -    *                in the final record. If the index is smaller than the number of specified
    -    *                fields, it shifts all following fields.
    -    * @param proctime optional system field to indicate processing-time; the index determines the
    -    *                 index in the final record. If the index is smaller than the number of
    -    *                 specified fields, it shifts all following fields.
         * @return a struct type with the input fieldNames, input fieldTypes, and system fields
         */
       def buildLogicalRowType(
    --- End diff --
    
    I think it's still fine. It creates a row type for the logical (Calcite) plan. 
    But I'm fine changing the name if you have a better one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4488#discussion_r131708260
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -245,14 +244,18 @@ abstract class CodeGenerator(
           returnType: TypeInformation[_ <: Any],
           resultFieldNames: Seq[String])
         : GeneratedExpression = {
    -    val input1AccessExprs = input1Mapping.map { idx =>
    -      generateInputAccess(input1, input1Term, idx)
    +    val input1AccessExprs = input1Mapping.map {
    --- End diff --
    
    Please add some inline comments here and also in the ScalaDoc of `input1Mapping` and `input2Mapping`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4488#discussion_r131704708
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---
    @@ -502,6 +500,68 @@ abstract class StreamTableEnvironment(
       }
     
       /**
    +    * Injects markers for time indicator fields into the field indexes.
    +    * A rowtime indicator is represented as -1, a proctime indicator as -2.
    +    *
    +    * @param fieldIndexes The field indexes into which the time indicators markers are injected.
    +    * @param rowtime An optional rowtime indicator
    +    * @param proctime An optional proctime indicator
    +    * @return An adjusted array of field indexes.
    +    */
    +  private def adjustFieldIndexes(
    +    fieldIndexes: Array[Int],
    +    rowtime: Option[(Int, String)],
    +    proctime: Option[(Int, String)]): Array[Int] = {
    +
    +    // inject rowtime field
    +    val withRowtime = if (rowtime.isDefined) {
    +      fieldIndexes.patch(rowtime.get._1, Seq(-1), 0) // -1 indicates rowtime
    +    } else {
    +      fieldIndexes
    +    }
    +
    +    // inject proctime field
    +    val withProctime = if (proctime.isDefined) {
    --- End diff --
    
    we could use pattern matching here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4488#discussion_r131704138
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---
    @@ -502,6 +500,68 @@ abstract class StreamTableEnvironment(
       }
     
       /**
    +    * Injects markers for time indicator fields into the field indexes.
    +    * A rowtime indicator is represented as -1, a proctime indicator as -2.
    +    *
    +    * @param fieldIndexes The field indexes into which the time indicators markers are injected.
    +    * @param rowtime An optional rowtime indicator
    +    * @param proctime An optional proctime indicator
    +    * @return An adjusted array of field indexes.
    +    */
    +  private def adjustFieldIndexes(
    +    fieldIndexes: Array[Int],
    +    rowtime: Option[(Int, String)],
    +    proctime: Option[(Int, String)]): Array[Int] = {
    +
    +    // inject rowtime field
    +    val withRowtime = if (rowtime.isDefined) {
    +      fieldIndexes.patch(rowtime.get._1, Seq(-1), 0) // -1 indicates rowtime
    +    } else {
    +      fieldIndexes
    +    }
    +
    +    // inject proctime field
    +    val withProctime = if (proctime.isDefined) {
    +      withRowtime.patch(proctime.get._1, Seq(-2), 0) // -2 indicates proctime
    +    } else {
    +      withRowtime
    +    }
    +
    +    withProctime
    +  }
    +
    +  /**
    +    * Injects names of time indicator fields into the list of field names.
    +    *
    +    * @param fieldNames The array of field names into which the time indicator field names are
    +    *                   injected.
    +    * @param rowtime An optional rowtime indicator
    +    * @param proctime An optional proctime indicator
    +    * @return An adjusted array of field names.
    +    */
    +  private def adjustFieldNames(
    +    fieldNames: Array[String],
    +    rowtime: Option[(Int, String)],
    +    proctime: Option[(Int, String)]): Array[String] = {
    +
    +    // inject rowtime field
    +    val withRowtime = if (rowtime.isDefined) {
    --- End diff --
    
    we could use pattern matching here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4488#discussion_r131724882
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputProcessRunner.scala ---
    @@ -18,42 +18,54 @@
     
     package org.apache.flink.table.runtime
     
    -import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
     import org.apache.flink.api.common.typeinfo.TypeInformation
     import org.apache.flink.api.java.typeutils.ResultTypeQueryable
     import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.streaming.api.operators.TimestampedCollector
     import org.apache.flink.table.codegen.Compiler
     import org.apache.flink.table.runtime.types.CRow
     import org.apache.flink.types.Row
    +import org.apache.flink.util.Collector
     import org.slf4j.LoggerFactory
     
     /**
    -  * MapRunner with [[CRow]] output.
    +  * ProcessRunner with [[CRow]] output.
       */
    -class CRowOutputMapRunner(
    +class CRowOutputProcessRunner(
         name: String,
         code: String,
         @transient var returnType: TypeInformation[CRow])
    -  extends RichMapFunction[Any, CRow]
    +  extends ProcessFunction[Any, CRow]
       with ResultTypeQueryable[CRow]
    -  with Compiler[MapFunction[Any, Row]] {
    +  with Compiler[ProcessFunction[Any, Row]] {
     
       val LOG = LoggerFactory.getLogger(this.getClass)
     
    -  private var function: MapFunction[Any, Row] = _
    -  private var outCRow: CRow = _
    +  private var function: ProcessFunction[Any, Row] = _
    +  private var cRowWrapper: CRowWrappingCollector = _
     
       override def open(parameters: Configuration): Unit = {
         LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code")
         val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
         LOG.debug("Instantiating MapFunction.")
         function = clazz.newInstance()
    -    outCRow = new CRow(null, true)
    +
    +    this.cRowWrapper = new CRowWrappingCollector()
    +    this.cRowWrapper.setChange(true)
       }
     
    -  override def map(in: Any): CRow = {
    -    outCRow.row = function.map(in)
    -    outCRow
    +  override def processElement(
    +      in: Any,
    +      ctx: ProcessFunction[Any, CRow]#Context,
    +      out: Collector[CRow]): Unit = {
    +
    +    // remove timestamp from stream record
    +    val tc = out.asInstanceOf[TimestampedCollector[_]]
    --- End diff --
    
    Do we need this change? It is not executed if `org.apache.flink.table.plan.nodes.CommonScan#needsConversion` returns false.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4488: [FLINK-7337] [table] Refactor internal handling of time i...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/4488
  
    @wuchong No problem. Thursday is fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4488#discussion_r131703254
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---
    @@ -677,12 +748,43 @@ abstract class StreamTableEnvironment(
     
         val rootParallelism = plan.getParallelism
     
    -    conversion match {
    -      case mapFunction: MapFunction[CRow, A] =>
    -        plan.map(mapFunction)
    -          .returns(tpe)
    -          .name(s"to: ${tpe.getTypeClass.getSimpleName}")
    -          .setParallelism(rootParallelism)
    +    val rowtimeFields = logicalType.getFieldList.asScala
    +      .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
    +
    +    if (rowtimeFields.isEmpty) {
    +      // to rowtime field to set
    --- End diff --
    
    typo?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4488: [FLINK-7337] [table] Refactor internal handling of time i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/4488
  
    Thanks for the review @twalthr. I addressed your comments and updated the PR.
    
    I think it would be very good to handle the timestamps internally as longs. The change seems to be a bit more involved because we need to touch the serialization logic and various type conversion and code generation issues. I'd rather do this as a follow up to this PR. What do you think?
    
    Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4488#discussion_r132818559
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala ---
    @@ -46,29 +47,29 @@ class StreamTableSourceScan(
         val fieldNames = TableEnvironment.getFieldNames(tableSource).toList
         val fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList
     
    -    val fieldCnt = fieldNames.length
    +    val fields = fieldNames.zip(fieldTypes)
     
    -    val rowtime = tableSource match {
    +    val withRowtime = tableSource match {
           case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute != null =>
    --- End diff --
    
    For the `DefinedRowtimeAttribute`, we hope the rowtime field can replace an existing field. Just like register a DataStream, the rowtime field can be appended but also can replace an existing field.
    
    It is not related to this PR, but we can discuss it at here and to do it in a separate issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4488#discussion_r131750828
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala ---
    @@ -148,6 +142,20 @@ class DataStreamGroupWindowAggregate(
             "state size. You may specify a retention time of 0 to not clean up the state.")
         }
     
    +    val timestampedInput = if (isRowtimeAttribute(window.timeAttribute)) {
    +      // copy the window rowtime attribute into the StreamRecord timestamp field
    +      val timeAttribute = window.timeAttribute.asInstanceOf[ResolvedFieldReference].name
    +      val timeIdx = inputSchema.logicalFieldNames.indexOf(timeAttribute)
    --- End diff --
    
    Yes, in fact we can only do it like this because logical and physical type are the same. Otherwise, we would need to adjust the index.
    
    I'll clean up `RowSchema` a bit more to remove the distinction of logical and physical types.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4488#discussion_r131714343
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala ---
    @@ -937,15 +935,40 @@ object ScalarOperators {
         }
       }
     
    -  def generateConcat(
    -      method: Method,
    -      operands: Seq[GeneratedExpression]): GeneratedExpression = {
    +  def generateConcat(operands: Seq[GeneratedExpression]): GeneratedExpression = {
     
    -    generateCallIfArgsNotNull(false, STRING_TYPE_INFO, operands) {
    -      (terms) =>s"${qualifyMethod(method)}(${terms.mkString(", ")})"
    +    generateCallIfArgsNotNull(true, STRING_TYPE_INFO, operands) {
    --- End diff --
    
    the nullability should depend on the code generators nullability


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4488#discussion_r131706276
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala ---
    @@ -172,45 +172,20 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
         *
         * @param fieldNames field names
         * @param fieldTypes field types, every element is Flink's [[TypeInformation]]
    -    * @param rowtime optional system field to indicate event-time; the index determines the index
    -    *                in the final record. If the index is smaller than the number of specified
    -    *                fields, it shifts all following fields.
    -    * @param proctime optional system field to indicate processing-time; the index determines the
    -    *                 index in the final record. If the index is smaller than the number of
    -    *                 specified fields, it shifts all following fields.
         * @return a struct type with the input fieldNames, input fieldTypes, and system fields
         */
       def buildLogicalRowType(
    --- End diff --
    
    Is the name of this method still correct?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4488: [FLINK-7337] [table] Refactor internal handling of time i...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/4488
  
    @wuchong @sunjincheng121 @shaoxuan-wang do you also want to take a look at it? Otherwise I would merge this and work on the followup issue for more efficiency.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4488#discussion_r132817997
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -1247,27 +1258,32 @@ abstract class CodeGenerator(
         }
       }
     
    -  private[flink] def generateRecordTimestamp(isEventTime: Boolean): GeneratedExpression = {
    +  private[flink] def generateRowtimeAccess(): GeneratedExpression = {
         val resultTerm = newName("result")
    -    val resultTypeTerm = primitiveTypeTermForTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
    +    val nullTerm = newName("isNull")
     
    -    val resultCode = if (isEventTime) {
    +    val accessCode =
           s"""
    -        |$resultTypeTerm $resultTerm;
    -        |if ($contextTerm.timestamp() == null) {
    +        |Long $resultTerm = $contextTerm.timestamp();
    +        |if ($resultTerm == null) {
             |  throw new RuntimeException("Rowtime timestamp is null. Please make sure that a proper " +
             |    "TimestampAssigner is defined and the stream environment uses the EventTime time " +
             |    "characteristic.");
             |}
    -        |else {
    -        |  $resultTerm = $contextTerm.timestamp();
    -        |}
    -        |""".stripMargin
    -    } else {
    +        |boolean $nullTerm = false;
    +       """.stripMargin
    +
    +    GeneratedExpression(resultTerm, nullTerm, accessCode, TimeIndicatorTypeInfo.ROWTIME_INDICATOR)
    +  }
    +
    +  private[flink] def generateProctimeTimestamp(): GeneratedExpression = {
    +    val resultTerm = newName("result")
    +    val resultTypeTerm = primitiveTypeTermForTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
    +
    +    val resultCode =
           s"""
             |$resultTypeTerm $resultTerm = $contextTerm.timerService().currentProcessingTime();
    --- End diff --
    
    Why not hardcode the `$resultTypeTerm` as `long` ? The `currentProcessingTime()` always returns `long` primitive type.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4488#discussion_r131728686
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala ---
    @@ -123,6 +124,8 @@ class ProcTimeBoundedRangeOver(
           return
         }
     
    +    out.asInstanceOf[TimestampedCollector[_]].eraseTimestamp()
    --- End diff --
    
    Do we really need these erasing steps in every function? Shouldn't a rowtime operator overwrite it anyway?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4488#discussion_r131764752
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala ---
    @@ -415,7 +406,13 @@ class RexTimeIndicatorMaterializer(
           case _ =>
             updatedCall.getOperands.map { o =>
               if (isTimeIndicatorType(o.getType)) {
    -            rexBuilder.makeCall(TimeMaterializationSqlFunction, o)
    +            if (isRowtimeIndicatorType(o.getType)) {
    --- End diff --
    
    Not sure. The duplicated code is spread over 2 classes and differently parameterized. I tried to add a companion object to `RexTimeIndicatorMaterializer` and the method but this did not really improve the code, IMO.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4488#discussion_r131726136
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -1136,25 +1142,31 @@ object AggregateUtil {
         }
       }
     
    -  private[flink] def computeWindowStartEndPropertyPos(
    -      properties: Seq[NamedWindowProperty]): (Option[Int], Option[Int]) = {
    +  private[flink] def computeWindowPropertyPos(
    +      properties: Seq[NamedWindowProperty]): (Option[Int], Option[Int], Option[Int]) = {
     
    -    val propPos = properties.foldRight((None: Option[Int], None: Option[Int], 0)) {
    +    val propPos = properties.foldRight(
    +      (None: Option[Int], None: Option[Int], None: Option[Int], 0)) {
           (p, x) => p match {
    --- End diff --
    
    Could you name `x` to (a, b, c)? It is hard to read things like `(x._1, Some(x._4), x._3, x._4 - 1)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---