You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by hequn8128 <gi...@git.apache.org> on 2017/04/18 12:38:14 UTC

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

GitHub user hequn8128 opened a pull request:

    https://github.com/apache/flink/pull/3733

    [FLINK-6091] [table] Implement and turn on retraction for aggregates

    Implement functions for generating and consuming retract messages for different aggregates.
    
    1. add delete/add property to Row
    2. implement functions for generating retract messages for unbounded groupBy
    3. implement functions for handling retract messages for different aggregates.
    4. handle retraction messages in `CommonCorrelate` and `CommonCalc` (retain Delete property).
    
    Currently, only unbounded groupby generates retraction and it is working under unbounded and processing time mode. Hence, so far retraction is only supported for unbounded and processing time aggregations. We can add more retraction support later.
    
    supported now: unbounded groupby, unbounded and processing time over window
    unsupported now: group window, event time or bounded over window.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/hequn8128/flink FLINK-6091-PR

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3733.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3733
    
----
commit 911e4516d3d5a17354c75d67e73153aa9194212b
Author: Hequn Cheng <ch...@gmail.com>
Date:   2017-04-18T08:54:09Z

    [FLINK-6091] [table] Implement and turn on retraction for aggregates

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by hequn8128 <gi...@git.apache.org>.
Github user hequn8128 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r112817984
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala ---
    @@ -68,12 +70,35 @@ class GroupAggProcessFunction(
         var accumulators = state.value()
     
         if (null == accumulators) {
    +      previous = null
           accumulators = new Row(aggregates.length)
           i = 0
           while (i < aggregates.length) {
             accumulators.setField(i, aggregates(i).createAccumulator())
             i += 1
           }
    +    } else {
    +      // get previous row
    +      if (generateRetraction) {
    +        if (null == previous) {
    +          previous = new Row(groupings.length + aggregates.length)
    --- End diff --
    
    done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r113050816
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala ---
    @@ -82,9 +83,9 @@ class ProcTimeBoundedRangeOver(
       }
     
       override def processElement(
    -    input: Row,
    -    ctx: ProcessFunction[Row, Row]#Context,
    -    out: Collector[Row]): Unit = {
    +    input: CRow,
    +    ctx: ProcessFunction[CRow, CRow]#Context,
    +    out: Collector[CRow]): Unit = {
     
         val currentTime = ctx.timerService.currentProcessingTime
    --- End diff --
    
    we can set `val row = input.row` and add `row` instead of `input` to the state.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by hequn8128 <gi...@git.apache.org>.
Github user hequn8128 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r112817965
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala ---
    @@ -107,6 +107,14 @@ class DataStreamGroupWindowAggregate(
         val groupingKeys = grouping.indices.toArray
         val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
     
    +    val consumeRetraction = DataStreamRetractionRules.isAccRetract(input)
    +
    +    if (consumeRetraction) {
    +      throw new TableException(
    +        "Retraction on group window is not supported yet. Note: Currently, group window should " +
    +          "not follow an unbounded groupby.")
    --- End diff --
    
    yes, non-windowed is more clear


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r113051486
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRow.scala ---
    @@ -0,0 +1,48 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.types
    +
    +import org.apache.flink.types.Row
    +
    +/**
    +  * Wrapper for a [[Row]] to add retraction information.
    +  *
    +  * If [[change]] is true, the [[CRow]] is an accumulate message, if it is false it is a
    +  * retraction message.
    +  *
    +  * @param row The wrapped [[Row]].
    +  * @param change true for an accumulate message, false for a retraction message.
    +  */
    +class CRow(var row: Row, var change: Boolean) {
    +
    +  override def toString: String = s"${if(change) "+" else "-"}$row"
    +
    +  override def equals(other: scala.Any): Boolean = {
    +    val otherCRow = other.asInstanceOf[CRow]
    +    row.equals(otherCRow.row) && change == otherCRow.change
    +  }
    +}
    +
    +class XRow(arity: Int, var change: Boolean) extends Row(arity) {
    --- End diff --
    
    Can this be removed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by hequn8128 <gi...@git.apache.org>.
Github user hequn8128 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r112817910
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala ---
    @@ -0,0 +1,310 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.api.scala.stream
    +
    +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    +import org.apache.flink.table.api.{TableEnvironment, TableException}
    +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamingWithStateTestBase}
    +import org.apache.flink.types.Row
    +import org.junit.Assert._
    +import org.junit.Test
    +import org.apache.flink.table.api.scala._
    +import org.apache.flink.api.scala._
    +import org.apache.flink.streaming.api.TimeCharacteristic
    +import org.apache.flink.table.utils.TableFunc0
    +
    +import scala.collection.mutable
    +
    +/**
    +  * tests for retraction
    +  */
    +class RetractionITCase extends StreamingWithStateTestBase {
    +  // input data
    +  val data = List(
    +    ("Hello", 1),
    +    ("word", 1),
    +    ("Hello", 1),
    +    ("bark", 1)
    +  )
    +
    +  // keyed groupby + keyed groupby
    +  @Test
    +  def testWordCount(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +    env.setParallelism(1)
    +    env.setStateBackend(getStateBackend)
    +
    +    val stream = env.fromCollection(data)
    +    val table = stream.toTable(tEnv, 'word, 'num)
    +    val resultTable = table
    +      .groupBy('word)
    +      .select('word as 'word, 'num.sum as 'count)
    +      .groupBy('count)
    +      .select('count, 'word.count as 'frequency)
    +
    +    val results = resultTable.toDataStream[Row]
    +    results.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = Seq("1,1", "1,2", "1,1", "2,1", "1,2")
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  // keyed groupby + non-keyed groupby
    +  @Test
    +  def testGroupByAndNonKeyedGroupBy(): Unit = {
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +    env.setParallelism(1)
    +    env.setStateBackend(getStateBackend)
    +
    +    val stream = env.fromCollection(data)
    +    val table = stream.toTable(tEnv, 'word, 'num)
    +    val resultTable = table
    +      .groupBy('word)
    +      .select('word as 'word, 'num.sum as 'count)
    +      .select('count.sum)
    +
    +    val results = resultTable.toDataStream[Row]
    +    results.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = Seq("1", "2", "1", "3", "4")
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  // non-keyed groupby + keyed groupby
    +  @Test
    +  def testNonKeyedGroupByAndGroupBy(): Unit = {
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +    env.setParallelism(1)
    +    env.setStateBackend(getStateBackend)
    +
    +    val stream = env.fromCollection(data)
    +    val table = stream.toTable(tEnv, 'word, 'num)
    +    val resultTable = table
    +      .select('num.sum as 'count)
    +      .groupBy('count)
    +      .select('count, 'count.count)
    +
    +    val results = resultTable.toDataStream[Row]
    +    results.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = Seq("1,1", "1,0", "2,1", "2,0", "3,1", "3,0", "4,1")
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  // keyed groupby + over agg(unbounded, procTime, keyed)
    +  @Test
    +  def testGroupByAndUnboundPartitionedProcessingWindowWithRow(): Unit = {
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    env.setStateBackend(getStateBackend)
    +    env.setParallelism(1)
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +
    +    StreamITCase.testResults = mutable.MutableList()
    +
    +    val t1 = env.fromCollection(data).toTable(tEnv).as('word, 'number)
    +
    +    tEnv.registerTable("T1", t1)
    +
    +    val sqlQuery = "SELECT word, cnt, count(word) " +
    +      "OVER (PARTITION BY cnt ORDER BY ProcTime() " +
    +      "ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" +
    +      "FROM " +
    +      "(SELECT word, count(number) as cnt from T1 group by word) "
    +
    +    val result = tEnv.sql(sqlQuery).toDataStream[Row]
    +    result.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = mutable.MutableList("Hello,1,1", "word,1,2", "Hello,2,1", "bark,1,2")
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  // keyed groupby + over agg(unbounded, procTime, non-keyed)
    +  @Test
    +  def testGroupByAndUnboundNonPartitionedProcessingWindowWithRow(): Unit = {
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    env.setStateBackend(getStateBackend)
    +    env.setParallelism(1)
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +
    +    StreamITCase.testResults = mutable.MutableList()
    +
    +    val t1 = env.fromCollection(data).toTable(tEnv).as('word, 'number)
    +
    +    tEnv.registerTable("T1", t1)
    +
    +    val sqlQuery = "SELECT word, cnt, count(word) " +
    +      "OVER (ORDER BY ProcTime() ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" +
    +      "FROM (SELECT word , count(number) as cnt from T1 group by word) "
    +
    +    val result = tEnv.sql(sqlQuery).toDataStream[Row]
    +    result.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = mutable.MutableList("Hello,1,1", "word,1,2", "Hello,2,2", "bark,1,3")
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  // test unique process, if the current output message of unbounded groupby equals the
    +  // previous message, unbounded groupby will ignore the current one.
    +  @Test
    +  def testUniqueProcess(): Unit = {
    +    // data input
    +    val data = List(
    +      (1234, 2L),
    +      (1234, 0L)
    +    )
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +    env.setParallelism(1)
    +    env.setStateBackend(getStateBackend)
    +
    +    val stream = env.fromCollection(data)
    +    val table = stream.toTable(tEnv, 'pk, 'value)
    +    val resultTable = table
    +      .groupBy('pk)
    +      .select('pk as 'pk, 'value.sum as 'sum)
    +      .groupBy('sum)
    +      .select('sum, 'pk.count as 'count)
    +
    +    val results = resultTable.toDataStream[Row]
    +    results.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = Seq("2,1")
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  // correlate should handle retraction messages correctly
    +  @Test
    +  def testCorrelate(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +    env.setParallelism(1)
    +    env.setStateBackend(getStateBackend)
    +
    +    val func0 = new TableFunc0
    +
    +    val stream = env.fromCollection(data)
    +    val table = stream.toTable(tEnv, 'word, 'num)
    +    val resultTable = table
    +      .groupBy('word)
    +      .select('word as 'word, 'num.sum as 'count)
    +      .leftOuterJoin(func0('word))
    +      .groupBy('count)
    +      .select('count, 'word.count as 'frequency)
    +
    +    val results = resultTable.toDataStream[Row]
    +    results.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = Seq("1,1", "1,2", "1,1", "2,1", "1,2")
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  // groupby + window agg
    +  @Test(expected = classOf[TableException])
    +  def testGroupByAndProcessingTimeSlidingGroupWindow(): Unit = {
    --- End diff --
    
    hi, these failures are throwed during `translateToPlan`,  so `TableTestBase` can not help, right ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r113044668
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala ---
    @@ -23,26 +23,28 @@ import org.apache.calcite.rel.`type`.RelDataType
     import org.apache.calcite.rex._
     import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction}
     import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
     import org.apache.flink.table.api.TableConfig
     import org.apache.flink.table.calcite.FlinkTypeFactory
     import org.apache.flink.table.codegen.{CodeGenerator, GeneratedFunction}
     import org.apache.flink.table.runtime.FlatMapRunner
    +import org.apache.flink.table.runtime.types.CRowTypeInfo
     import org.apache.flink.types.Row
     
     import scala.collection.JavaConversions._
     import scala.collection.JavaConverters._
     
     trait CommonCalc {
     
    -  private[flink] def functionBody(
    +  private[flink] def functionBody[T](
    --- End diff --
    
    I would follow a similar strategy as with the `TableEnvironment` sink conversion for the `CommonX` classes.
    The `CommonX` class has a method to generate the code for a function that operates on `Row`. Each class that extends `CommonX` has a dedicated method to instantiate the wrapper class (e..g, `MapRunner`, `FlatMapRunner`. ). The runner classes use the generated function to operate on Rows. For streaming operators, the runner classes unwrap the `Row` from `CRow` before calling the generated code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3733: [FLINK-6091] [table] Implement and turn on retraction for...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3733
  
    Thanks for the update @hequn8128. The PR looks really good, IMO.
    Will do a couple of minor refactorings and merge it to `table-retraction` branch.
    
    Best, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by lincoln-lil <gi...@git.apache.org>.
Github user lincoln-lil commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r111959451
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala ---
    @@ -68,12 +70,35 @@ class GroupAggProcessFunction(
         var accumulators = state.value()
     
         if (null == accumulators) {
    +      previous = null
           accumulators = new Row(aggregates.length)
           i = 0
           while (i < aggregates.length) {
             accumulators.setField(i, aggregates(i).createAccumulator())
             i += 1
           }
    +    } else {
    +      // get previous row
    +      if (generateRetraction) {
    +        if (null == previous) {
    +          previous = new Row(groupings.length + aggregates.length)
    +          // previous is used to output retract message, so command of previous will always be
    +          // Command.Delete
    +          previous.command = Command.Delete
    +        }
    +        i = 0
    +        while (i < groupings.length) {
    +          previous.setField(i, input.getField(groupings(i)))
    +          i += 1
    +        }
    +        i = 0
    +        while (i < aggregates.length) {
    +          val index = groupings.length + i
    --- End diff --
    
    reduce a local variable:
    previous.setField(groupings.length + i, aggregates(i).getValue(accumulator))
    
    also L118 and L127


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by lincoln-lil <gi...@git.apache.org>.
Github user lincoln-lil commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r111959829
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala ---
    @@ -0,0 +1,310 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.api.scala.stream
    +
    +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    +import org.apache.flink.table.api.{TableEnvironment, TableException}
    +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamingWithStateTestBase}
    +import org.apache.flink.types.Row
    +import org.junit.Assert._
    +import org.junit.Test
    +import org.apache.flink.table.api.scala._
    +import org.apache.flink.api.scala._
    +import org.apache.flink.streaming.api.TimeCharacteristic
    +import org.apache.flink.table.utils.TableFunc0
    +
    +import scala.collection.mutable
    +
    +/**
    +  * tests for retraction
    +  */
    +class RetractionITCase extends StreamingWithStateTestBase {
    +  // input data
    +  val data = List(
    +    ("Hello", 1),
    +    ("word", 1),
    +    ("Hello", 1),
    +    ("bark", 1)
    +  )
    +
    +  // keyed groupby + keyed groupby
    +  @Test
    +  def testWordCount(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +    env.setParallelism(1)
    +    env.setStateBackend(getStateBackend)
    +
    +    val stream = env.fromCollection(data)
    +    val table = stream.toTable(tEnv, 'word, 'num)
    +    val resultTable = table
    +      .groupBy('word)
    +      .select('word as 'word, 'num.sum as 'count)
    +      .groupBy('count)
    +      .select('count, 'word.count as 'frequency)
    +
    +    val results = resultTable.toDataStream[Row]
    +    results.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = Seq("1,1", "1,2", "1,1", "2,1", "1,2")
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  // keyed groupby + non-keyed groupby
    +  @Test
    +  def testGroupByAndNonKeyedGroupBy(): Unit = {
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +    env.setParallelism(1)
    +    env.setStateBackend(getStateBackend)
    +
    +    val stream = env.fromCollection(data)
    +    val table = stream.toTable(tEnv, 'word, 'num)
    +    val resultTable = table
    +      .groupBy('word)
    +      .select('word as 'word, 'num.sum as 'count)
    +      .select('count.sum)
    +
    +    val results = resultTable.toDataStream[Row]
    +    results.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = Seq("1", "2", "1", "3", "4")
    --- End diff --
    
    Could you explain this example?
    table.groupBy('word).select('word as 'word, 'num.sum as 'count)
    I think this query does not produce retraction


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r112901608
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ---
    @@ -723,11 +738,11 @@ abstract class TableEnvironment(val config: TableConfig) {
     
         val genFunction = generator.generateFunction(
           functionName,
    -      classOf[MapFunction[Row, T]],
    +      classOf[MapFunction[P, T]],
    --- End diff --
    
    This means we need a dedicated `MapRunner` (and other runners) for streaming, but IMO, that's an easier change than touching the code-gen of the whole Table API.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by lincoln-lil <gi...@git.apache.org>.
Github user lincoln-lil commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r111957675
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java ---
    @@ -40,6 +42,7 @@
     	private static final long serialVersionUID = 1L;
     	private final boolean[] nullMask;
     	private final TypeSerializer<Object>[] fieldSerializers;
    +	private final EnumSerializer commandSerializer = new EnumSerializer(Command.class);
    --- End diff --
    
    advice: non-static final members should be initialized in constructor, otherwise make it 'static final' and use upper case variable name.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by lincoln-lil <gi...@git.apache.org>.
Github user lincoln-lil commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r111959008
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala ---
    @@ -107,6 +107,14 @@ class DataStreamGroupWindowAggregate(
         val groupingKeys = grouping.indices.toArray
         val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
     
    +    val consumeRetraction = DataStreamRetractionRules.isAccRetract(input)
    +
    +    if (consumeRetraction) {
    +      throw new TableException(
    +        "Retraction on group window is not supported yet. Note: Currently, group window should " +
    +          "not follow an unbounded groupby.")
    --- End diff --
    
    1. 'is not supported yet' -> 'unsupported yet' maybe more concisely.
    2. groupby -> groupBy


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r113047487
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala ---
    @@ -46,12 +47,12 @@ class AggregateAggFunction(
         accumulatorRow
       }
     
    -  override def add(value: Row, accumulatorRow: Row): Unit = {
    +  override def add(value: CRow, accumulatorRow: Row): Unit = {
     
    --- End diff --
    
    add here `val row = value.row` and use `row` to aggregate the values.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r113043582
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -255,17 +256,23 @@ class CodeGenerator(
         *
         * @return A GeneratedAggregationsFunction
         */
    -  def generateAggregations(
    +  def generateAggregations[T](
    --- End diff --
    
    revert all changes in `CodeGenerator`. The wrapping functions (such as `MapRunner`) can extract the `Row` from a `CRow` and call the generated function with that object.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by lincoln-lil <gi...@git.apache.org>.
Github user lincoln-lil commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r111959594
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala ---
    @@ -84,16 +109,38 @@ class GroupAggProcessFunction(
         }
     
         // Set aggregate result to the final output
    -    i = 0
    -    while (i < aggregates.length) {
    -      val index = groupings.length + i
    -      val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
    -      aggregates(i).accumulate(accumulator, input.getField(aggFields(i)(0)))
    -      output.setField(index, aggregates(i).getValue(accumulator))
    -      i += 1
    +    if (input.command == Command.Delete) {
    +      i = 0
    +      while (i < aggregates.length) {
    +        val index = groupings.length + i
    +        val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
    +        aggregates(i).retract(accumulator, input.getField(aggFields(i)(0)))
    +        output.setField(index, aggregates(i).getValue(accumulator))
    +        i += 1
    +      }
    +    } else {
    +      i = 0
    +      while (i < aggregates.length) {
    +        val index = groupings.length + i
    +        val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
    +        aggregates(i).accumulate(accumulator, input.getField(aggFields(i)(0)))
    +        output.setField(index, aggregates(i).getValue(accumulator))
    +        i += 1
    +      }
         }
    -    state.update(accumulators)
     
    +    // if previous is not null, do retraction process
    +    if (null != previous) {
    +      if (previous.equals(output)) {
    +        // ignore same output
    +        return
    --- End diff --
    
    Is it possible which needs output the 'same' row ?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r112078731
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala ---
    @@ -41,14 +41,16 @@ class GroupAggProcessFunction(
         private val aggregates: Array[AggregateFunction[_]],
         private val aggFields: Array[Array[Int]],
         private val groupings: Array[Int],
    -    private val aggregationStateType: RowTypeInfo)
    +    private val aggregationStateType: RowTypeInfo,
    +    private val generateRetraction: Boolean)
       extends ProcessFunction[Row, Row] {
     
       Preconditions.checkNotNull(aggregates)
       Preconditions.checkNotNull(aggFields)
       Preconditions.checkArgument(aggregates.length == aggFields.length)
     
       private var output: Row = _
    +  private var previous: Row = _
    --- End diff --
    
    rename to `previousRow`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r112901336
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ---
    @@ -723,11 +738,11 @@ abstract class TableEnvironment(val config: TableConfig) {
     
         val genFunction = generator.generateFunction(
           functionName,
    -      classOf[MapFunction[Row, T]],
    +      classOf[MapFunction[P, T]],
    --- End diff --
    
    I think we should not touch the code generator. Instead, we unwrap the `Row` from the `CRow` before handing it to the code-gen'd function. 
    I'll add more comments on this below.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r112065725
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---
    @@ -129,11 +131,17 @@ class DataStreamOverAggregate(
                 generator,
                 inputDS,
                 isRowTimeType = false,
    -            isRowsClause = overWindow.isRows)
    +            isRowsClause = overWindow.isRows,
    +            consumeRetraction)
    --- End diff --
    
    I think it would be better to enable retraction for all types of OVER aggregates at the same time. 
    Just supporting one specific type adds more confusion than it helps, in my opinion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by lincoln-lil <gi...@git.apache.org>.
Github user lincoln-lil commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r111958631
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala ---
    @@ -182,9 +192,17 @@ trait CommonCorrelate {
           returnType,
           rowType.getFieldNames.asScala)
     
    +    val retractionProcess =
    --- End diff --
    
    see CommonCalc.scala: L56


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r113043191
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala ---
    @@ -233,6 +234,9 @@ object CodeGenUtils {
           case ri: RowTypeInfo =>
             ProductAccessor(index)
     
    +      case cri: CRowTypeInfo =>
    --- End diff --
    
    Revert these changes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r112060500
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java ---
    @@ -152,6 +161,7 @@ public void serialize(Row record, DataOutputView target) throws IOException {
     				fieldSerializers[i].serialize(o, target);
     			}
     		}
    +		commandSerializer.serialize(record.command, target);
    --- End diff --
    
    I'm afraid we cannot change the serialization of `Row`. `Row` is a public class in `flink-core` and not an internal `flink-table` class. Hence, it is used at other places and might also be part of user applications. If we change the serialization, users might not be able to restore a job on 1.3 from a savepoint taken with 1.2. This restriction rules out to simply add a field to `Row` which would avoid major refactorings.
    
    I see two options to add the command field to the data streams in `flink-table`
    
    1. use a regular field in `Row`. This would mean that the physical layout of the `Row` is no longer the same as the logical layout, i.e., the one expected by Calcite. However, we will probably change this anyway for the upcoming changes related to the time indicators. For these, the physical layout will have fewer fields than the logical layout (we will remove time fields which are in the meta data of Flink's records or taken as processing time). By adding the command field, we would add a field which is not in the logical layout. The problem with this approach is that the command field would be at different positions in the Row (probably the last one). We could leverage the changes introduced by the time indicator changes (or the other way round). @twalthr is working on this. You can have a look at the current status here: https://github.com/twalthr/flink/tree/FLINK-5884
    2. The other option is to wrap the rows in a custom data type similar to a `Tuple2[Row, Command]`. The data type could be names `Change` or `CRow` and would have its own `TypeInformation`, `TypeSerializer`, and `TypeComparator` which forward most calls to the type info, serializer, and comparator of `Row`. The problem with this approach is that we need to change the return types of all functions. For some functions this might not be a big issue if we can take the `Row` object before passing it to the code gen'd functions. The command field could be set when the result Row is returned or in a wrapping `Collector`. 
    
    My gut feeling is that the second approach is easier to implement because we (hopefully) do not need to touch the generated code and "just" need to wrap all `Row` objects in `CRow` objects.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r113051216
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala ---
    @@ -19,34 +19,44 @@
     package org.apache.flink.table.runtime.aggregate
     
     import org.apache.calcite.runtime.SqlFunctions
    +import org.apache.flink.table.runtime.types.CRow
     import org.apache.flink.types.Row
     import org.apache.flink.util.Collector
     
     /**
       * Adds TimeWindow properties to specified fields of a row before it emits the row to a wrapped
       * collector.
       */
    -class TimeWindowPropertyCollector(windowStartOffset: Option[Int], windowEndOffset: Option[Int])
    -    extends Collector[Row] {
    +class TimeWindowPropertyCollector[T](windowStartOffset: Option[Int], windowEndOffset: Option[Int])
    --- End diff --
    
    we can make this an `abstract` class and add to subclasses `RowTimeWindowPropertyCollector` and `CRowTimeWindowPropertyCollector`.
    That would avoid the `if` condition to identify the input type.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r112064885
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala ---
    @@ -107,6 +107,14 @@ class DataStreamGroupWindowAggregate(
         val groupingKeys = grouping.indices.toArray
         val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
     
    +    val consumeRetraction = DataStreamRetractionRules.isAccRetract(input)
    +
    +    if (consumeRetraction) {
    +      throw new TableException(
    +        "Retraction on group window is not supported yet. Note: Currently, group window should " +
    +          "not follow an unbounded groupby.")
    --- End diff --
    
    `unbounded groupBy` -> `non-windowed GroupBy`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r113046533
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala ---
    @@ -92,11 +92,13 @@ class DataStreamCalc(
     
         val genFunction = generator.generateFunction(
    --- End diff --
    
    If we generate a `FlatMapFunction[Row, Row]` we can use a `RetractFlatMapRunner` class to unwrap the `Row` from the `CRow` before calling the generated code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by lincoln-lil <gi...@git.apache.org>.
Github user lincoln-lil commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r111957812
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java ---
    @@ -171,6 +181,7 @@ public Row deserialize(DataInputView source) throws IOException {
     				result.setField(i, fieldSerializers[i].deserialize(source));
     			}
     		}
    +		result.command = (Command) commandSerializer.deserialize(source);
    --- End diff --
    
    should we use the method with reuse variable ?
    T deserialize(T reuse, DataInputView source)
    though it doesn't perform reusing actually


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3733: [FLINK-6091] [table] Implement and turn on retraction for...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3733
  
    Hi @hequn8128, I merged the PR to `table-retraction`. Can you close it?
    
    Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r113050047
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala ---
    @@ -93,7 +94,7 @@ class ProcTimeBoundedRangeOver(
         var rowList = rowMapState.get(currentTime)
         // null value means that this si the first event received for this timestamp
         if (rowList == null) {
    -      rowList = new ArrayList[Row]()
    +      rowList = new ArrayList[CRow]()
    --- End diff --
    
    Keep `Row`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by hequn8128 <gi...@git.apache.org>.
Github user hequn8128 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r112817968
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---
    @@ -129,11 +131,17 @@ class DataStreamOverAggregate(
                 generator,
                 inputDS,
                 isRowTimeType = false,
    -            isRowsClause = overWindow.isRows)
    +            isRowsClause = overWindow.isRows,
    +            consumeRetraction)
    --- End diff --
    
    hi, you are right, maybe it's better for us not to support OVER at current.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by hequn8128 <gi...@git.apache.org>.
Github user hequn8128 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r112818005
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala ---
    @@ -0,0 +1,310 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.api.scala.stream
    +
    +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    +import org.apache.flink.table.api.{TableEnvironment, TableException}
    +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamingWithStateTestBase}
    +import org.apache.flink.types.Row
    +import org.junit.Assert._
    +import org.junit.Test
    +import org.apache.flink.table.api.scala._
    +import org.apache.flink.api.scala._
    +import org.apache.flink.streaming.api.TimeCharacteristic
    +import org.apache.flink.table.utils.TableFunc0
    +
    +import scala.collection.mutable
    +
    +/**
    +  * tests for retraction
    +  */
    +class RetractionITCase extends StreamingWithStateTestBase {
    +  // input data
    +  val data = List(
    +    ("Hello", 1),
    +    ("word", 1),
    +    ("Hello", 1),
    +    ("bark", 1)
    +  )
    +
    +  // keyed groupby + keyed groupby
    +  @Test
    +  def testWordCount(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +    env.setParallelism(1)
    +    env.setStateBackend(getStateBackend)
    +
    +    val stream = env.fromCollection(data)
    +    val table = stream.toTable(tEnv, 'word, 'num)
    +    val resultTable = table
    +      .groupBy('word)
    +      .select('word as 'word, 'num.sum as 'count)
    +      .groupBy('count)
    +      .select('count, 'word.count as 'frequency)
    +
    +    val results = resultTable.toDataStream[Row]
    +    results.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = Seq("1,1", "1,2", "1,1", "2,1", "1,2")
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  // keyed groupby + non-keyed groupby
    +  @Test
    +  def testGroupByAndNonKeyedGroupBy(): Unit = {
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +    env.setParallelism(1)
    +    env.setStateBackend(getStateBackend)
    +
    +    val stream = env.fromCollection(data)
    +    val table = stream.toTable(tEnv, 'word, 'num)
    +    val resultTable = table
    +      .groupBy('word)
    +      .select('word as 'word, 'num.sum as 'count)
    +      .select('count.sum)
    +
    +    val results = resultTable.toDataStream[Row]
    +    results.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = Seq("1", "2", "1", "3", "4")
    --- End diff --
    
    hi, there is a `.select('count.sum)` after `groupBy('word).select('word as 'word, 'num.sum as 'count)` 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by hequn8128 <gi...@git.apache.org>.
Github user hequn8128 closed the pull request at:

    https://github.com/apache/flink/pull/3733


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by lincoln-lil <gi...@git.apache.org>.
Github user lincoln-lil commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r111958512
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala ---
    @@ -135,11 +136,20 @@ trait CommonCorrelate {
           }
           val outerResultExpr = functionGenerator.generateResultExpression(
             input1AccessExprs ++ input2NullExprs, returnType, rowType.getFieldNames.asScala)
    +
    +      val retractionProcess =
    --- End diff --
    
    see CommonCalc.scala: L56


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3733: [FLINK-6091] [table] Implement and turn on retraction for...

Posted by hequn8128 <gi...@git.apache.org>.
Github user hequn8128 commented on the issue:

    https://github.com/apache/flink/pull/3733
  
    hi @fhueske , thanks a lot for your review and help. I have addressed all your comments and updated the PR. All changes have been checked before my latest update. 
    Thanks,  Hequn


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r113050568
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala ---
    @@ -48,15 +49,15 @@ class ProcTimeBoundedRowsOver(
         genAggregations: GeneratedAggregationsFunction,
         precedingOffset: Long,
         aggregatesTypeInfo: RowTypeInfo,
    -    inputType: TypeInformation[Row])
    -  extends ProcessFunction[Row, Row]
    +    inputType: TypeInformation[CRow])
    +  extends ProcessFunction[CRow, CRow]
         with Compiler[GeneratedAggregations] {
     
       Preconditions.checkArgument(precedingOffset > 0)
     
       private var accumulatorState: ValueState[Row] = _
    -  private var rowMapState: MapState[Long, JList[Row]] = _
    -  private var output: Row = _
    +  private var rowMapState: MapState[Long, JList[CRow]] = _
    --- End diff --
    
    Same as for the other over window. Let's use `Row` for the state and extract the `Row` from the `CRow`.
    `output` should be a `CRow`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r112079294
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala ---
    @@ -84,16 +109,38 @@ class GroupAggProcessFunction(
         }
     
         // Set aggregate result to the final output
    -    i = 0
    -    while (i < aggregates.length) {
    -      val index = groupings.length + i
    -      val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
    -      aggregates(i).accumulate(accumulator, input.getField(aggFields(i)(0)))
    -      output.setField(index, aggregates(i).getValue(accumulator))
    -      i += 1
    +    if (input.command == Command.Delete) {
    +      i = 0
    +      while (i < aggregates.length) {
    +        val index = groupings.length + i
    +        val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
    +        aggregates(i).retract(accumulator, input.getField(aggFields(i)(0)))
    +        output.setField(index, aggregates(i).getValue(accumulator))
    +        i += 1
    +      }
    +    } else {
    +      i = 0
    +      while (i < aggregates.length) {
    +        val index = groupings.length + i
    +        val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
    +        aggregates(i).accumulate(accumulator, input.getField(aggFields(i)(0)))
    +        output.setField(index, aggregates(i).getValue(accumulator))
    +        i += 1
    +      }
         }
    -    state.update(accumulators)
     
    +    // if previous is not null, do retraction process
    +    if (null != previous) {
    --- End diff --
    
    check against `generateRetraction`. The check can be optimized because `generateRetraction`is a `val` and hence `final`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r112901968
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala ---
    @@ -175,14 +176,19 @@ object FlinkTypeFactory {
       /**
         * Converts a Calcite logical record into a Flink type information.
         */
    -  def toInternalRowTypeInfo(logicalRowType: RelDataType): TypeInformation[Row] = {
    +  def toInternalRowTypeInfo[T](logicalRowType: RelDataType, resultClass: Class[T])
    --- End diff --
    
    I would keep this method as it is. 
    Whenever we need a `CRow` we can easily create it as `CRowTypeInfo(rowType)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by hequn8128 <gi...@git.apache.org>.
Github user hequn8128 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r112817995
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala ---
    @@ -84,16 +109,38 @@ class GroupAggProcessFunction(
         }
     
         // Set aggregate result to the final output
    -    i = 0
    -    while (i < aggregates.length) {
    -      val index = groupings.length + i
    -      val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
    -      aggregates(i).accumulate(accumulator, input.getField(aggFields(i)(0)))
    -      output.setField(index, aggregates(i).getValue(accumulator))
    -      i += 1
    +    if (input.command == Command.Delete) {
    +      i = 0
    +      while (i < aggregates.length) {
    +        val index = groupings.length + i
    +        val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
    +        aggregates(i).retract(accumulator, input.getField(aggFields(i)(0)))
    +        output.setField(index, aggregates(i).getValue(accumulator))
    +        i += 1
    +      }
    +    } else {
    +      i = 0
    +      while (i < aggregates.length) {
    +        val index = groupings.length + i
    +        val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
    +        aggregates(i).accumulate(accumulator, input.getField(aggFields(i)(0)))
    +        output.setField(index, aggregates(i).getValue(accumulator))
    +        i += 1
    +      }
         }
    -    state.update(accumulators)
     
    +    // if previous is not null, do retraction process
    +    if (null != previous) {
    +      if (previous.equals(output)) {
    +        // ignore same output
    +        return
    --- End diff --
    
    hi, we need not to output a same row, think about batch, only one result will be output, is it right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by lincoln-lil <gi...@git.apache.org>.
Github user lincoln-lil commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r111958339
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala ---
    @@ -52,10 +53,18 @@ trait CommonCalc {
           rowType.getFieldNames,
           expandedExpressions)
     
    +    val retractionProcess =
    --- End diff --
    
    rename retractionProcess to retractionProcessCode
    or moving this part to CodeGenerator considering the consistency of code style.
    And I prefer the later one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r112078707
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala ---
    @@ -41,14 +41,16 @@ class GroupAggProcessFunction(
         private val aggregates: Array[AggregateFunction[_]],
         private val aggFields: Array[Array[Int]],
         private val groupings: Array[Int],
    -    private val aggregationStateType: RowTypeInfo)
    +    private val aggregationStateType: RowTypeInfo,
    +    private val generateRetraction: Boolean)
       extends ProcessFunction[Row, Row] {
     
       Preconditions.checkNotNull(aggregates)
       Preconditions.checkNotNull(aggFields)
       Preconditions.checkArgument(aggregates.length == aggFields.length)
     
       private var output: Row = _
    --- End diff --
    
    rename to `newRow`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r113046948
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala ---
    @@ -22,17 +22,16 @@ import org.apache.calcite.rel.RelNode
     import org.apache.flink.streaming.api.datastream.DataStream
     import org.apache.flink.table.api.StreamTableEnvironment
     import org.apache.flink.table.plan.nodes.FlinkRel
    -import org.apache.flink.types.Row
     
    -trait DataStreamRel extends RelNode with FlinkRel {
    +trait DataStreamRel[T] extends RelNode with FlinkRel {
    --- End diff --
    
    I don't think we need to parameterize `DataStreamRel`. All `DataStreamRel` nodes should return `CRow` in my opinion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r113048507
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -159,18 +174,22 @@ object AggregateUtil {
         * @param generator       code generator instance
         * @param namedAggregates List of calls to aggregate functions and their output field names
         * @param inputType       Input row type
    +    * @param inputTypeInfo   Input DataStream row type
    +    * @param returnTypeInfo  Return DataStream row type
         * @param precedingOffset the preceding offset
    -    * @param isRowsClause   It is a tag that indicates whether the OVER clause is ROWS clause
    +    * @param isRowsClause    It is a tag that indicates whether the OVER clause is ROWS clause
         * @param isRowTimeType   It is a tag that indicates whether the time type is rowTimeType
         * @return [[org.apache.flink.streaming.api.functions.ProcessFunction]]
         */
    -  private[flink] def createBoundedOverProcessFunction(
    +  private[flink] def createBoundedOverProcessFunction[T](
         generator: CodeGenerator,
         namedAggregates: Seq[CalcitePair[AggregateCall, String]],
         inputType: RelDataType,
    +    inputTypeInfo: TypeInformation[T],
    --- End diff --
    
    remove new type info parameters.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r113048128
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -102,13 +110,17 @@ object AggregateUtil {
             new RowTimeUnboundedRowsOver(
               genFunction,
               aggregationStateType,
    -          FlinkTypeFactory.toInternalRowTypeInfo(inputType))
    +          FlinkTypeFactory
    --- End diff --
    
    change to `CRowTypeInfo(FlinkTypeFactory.toInternalRowType(inputType)))`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r113050330
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala ---
    @@ -69,10 +70,10 @@ class ProcTimeBoundedRangeOver(
         output = function.createOutputRow()
    --- End diff --
    
    When the code-gen'd functions need as `Row` to set the result. We can simply pass `output.row` then and later emit `output`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r113049009
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala ---
    @@ -66,7 +66,7 @@ class DataSetSessionWindowAggReduceGroupFunction(
     
       private var aggregateBuffer: Row = _
       private var output: Row = _
    -  private var collector: TimeWindowPropertyCollector = _
    +  private var collector: TimeWindowPropertyCollector[Row] = _
    --- End diff --
    
    We can make `TimeWindowPropertyCollector` abstract and implement two versions, one for `Row` and one for `CRow`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r113045326
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala ---
    @@ -45,7 +45,7 @@ class DataStreamCalc(
         ruleDescription: String)
       extends Calc(cluster, traitSet, input, calcProgram)
       with CommonCalc
    -  with DataStreamRel {
    +  with DataStreamRel[CRow] {
    --- End diff --
    
    `DataStreamRel` does not need a type parameter, IMO. All `DataStreamRel` nodes should produce `CRow`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by lincoln-lil <gi...@git.apache.org>.
Github user lincoln-lil commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r111959203
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---
    @@ -129,11 +131,17 @@ class DataStreamOverAggregate(
                 generator,
                 inputDS,
                 isRowTimeType = false,
    -            isRowsClause = overWindow.isRows)
    +            isRowsClause = overWindow.isRows,
    +            consumeRetraction)
             } else if (
               overWindow.lowerBound.isPreceding && !overWindow.lowerBound.isUnbounded &&
                 overWindow.upperBound.isCurrentRow) {
               // bounded OVER window
    +          if (consumeRetraction) {
    +            throw new TableException(
    +              "Retraction for bounded over window is not supported yet. Note: Currently, bounded " +
    --- End diff --
    
    1. Retraction for -> on
    2. groupby -> groupBy
    3. keep the term consistent either choosing 'OVER' or 'over' in the same context.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r113047960
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -61,23 +62,28 @@ object AggregateUtil {
         * @param generator       code generator instance
         * @param namedAggregates List of calls to aggregate functions and their output field names
         * @param inputType Input row type
    +    * @param inputTypeInfo Input DataStream row type
    +    * @param returnTypeInfo Return DataStream row type
         * @param isRowTimeType It is a tag that indicates whether the time type is rowTimeType
         * @param isPartitioned It is a tag that indicate whether the input is partitioned
         * @param isRowsClause It is a tag that indicates whether the OVER clause is ROWS clause
         */
    -  private[flink] def createUnboundedOverProcessFunction(
    +  private[flink] def createUnboundedOverProcessFunction[T](
         generator: CodeGenerator,
         namedAggregates: Seq[CalcitePair[AggregateCall, String]],
         inputType: RelDataType,
    +    inputTypeInfo: TypeInformation[T],
    --- End diff --
    
    We don't need these types if we keep the code generation as it is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r113048214
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -102,13 +110,17 @@ object AggregateUtil {
             new RowTimeUnboundedRowsOver(
               genFunction,
               aggregationStateType,
    -          FlinkTypeFactory.toInternalRowTypeInfo(inputType))
    +          FlinkTypeFactory
    +            .toInternalRowTypeInfo(inputType, classOf[CRow])
    +            .asInstanceOf[CRowTypeInfo])
           } else {
             // RANGE unbounded over process function
             new RowTimeUnboundedRangeOver(
               genFunction,
               aggregationStateType,
    -          FlinkTypeFactory.toInternalRowTypeInfo(inputType))
    +          FlinkTypeFactory
    --- End diff --
    
    change to `CRowTypeInfo(FlinkTypeFactory.toInternalRowType(inputType)))`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r112065233
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java ---
    @@ -152,6 +161,7 @@ public void serialize(Row record, DataOutputView target) throws IOException {
     				fieldSerializers[i].serialize(o, target);
     			}
     		}
    +		commandSerializer.serialize(record.command, target);
    --- End diff --
    
    Also by adding the command to `Row` we add serialization overhead to all jobs that use Row (including batch Table API / SQL).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3733: [FLINK-6091] [table] Implement and turn on retraction for...

Posted by hequn8128 <gi...@git.apache.org>.
Github user hequn8128 commented on the issue:

    https://github.com/apache/flink/pull/3733
  
    Hi @fhueske , the latest commit is a bugfix. It mainly includes the following contents:
    
    1. request type add `CRow` support in `TableEnvironment`\uff0cso we can sink with `CRow`
    2. `getFieldInfo` add `CRow` support in `TableEnvironment`\uff0cso we can init a `Table` from a `DataStream` of `CRow`
    3. fix type erasure problem in `TableEnvironment` and `DataStreamRetractionRules`
    
    Thanks, Hequn


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r112078358
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala ---
    @@ -68,12 +70,35 @@ class GroupAggProcessFunction(
         var accumulators = state.value()
     
         if (null == accumulators) {
    +      previous = null
           accumulators = new Row(aggregates.length)
           i = 0
           while (i < aggregates.length) {
             accumulators.setField(i, aggregates(i).createAccumulator())
             i += 1
           }
    +    } else {
    +      // get previous row
    +      if (generateRetraction) {
    +        if (null == previous) {
    +          previous = new Row(groupings.length + aggregates.length)
    --- End diff --
    
    can we initialize `previous` in `open()`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r113044998
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala ---
    @@ -38,7 +39,9 @@ trait BatchScan extends CommonScan with DataSetRel {
     
         val inputType = input.getType
     
    -    val internalType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
    --- End diff --
    
    These changes can be reverted if we leave the `FlinkTypeFactory.toInternalRowTypeInfo()` method as it is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by lincoln-lil <gi...@git.apache.org>.
Github user lincoln-lil commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r111958057
  
    --- Diff: flink-core/src/main/java/org/apache/flink/types/Row.java ---
    @@ -46,6 +46,9 @@
     	/** The array to store actual values. */
     	private final Object[] fields;
     
    +	/** Indicate to add or delete this row */
    --- End diff --
    
    I prefer "Indicates that the line is add or delete type"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r113049237
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala ---
    @@ -31,35 +32,35 @@ abstract class GeneratedAggregations extends Function {
         *
         * @param accumulators the accumulators (saved in a row) which contains the current
         *                     aggregated results
    -    * @param output       output results collected in a row
    +    * @param output       output results collected in a command row
         */
    -  def setAggregationResults(accumulators: Row, output: Row)
    +  def setAggregationResults(accumulators: Row, output: CRow)
    --- End diff --
    
    I would change all `CRow` types back to `Row`. Let's leave the code generation as it is and extract the rows in the wrapper classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r112081457
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala ---
    @@ -0,0 +1,310 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.api.scala.stream
    +
    +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    +import org.apache.flink.table.api.{TableEnvironment, TableException}
    +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamingWithStateTestBase}
    +import org.apache.flink.types.Row
    +import org.junit.Assert._
    +import org.junit.Test
    +import org.apache.flink.table.api.scala._
    +import org.apache.flink.api.scala._
    +import org.apache.flink.streaming.api.TimeCharacteristic
    +import org.apache.flink.table.utils.TableFunc0
    +
    +import scala.collection.mutable
    +
    +/**
    +  * tests for retraction
    +  */
    +class RetractionITCase extends StreamingWithStateTestBase {
    +  // input data
    +  val data = List(
    +    ("Hello", 1),
    +    ("word", 1),
    +    ("Hello", 1),
    +    ("bark", 1)
    +  )
    +
    +  // keyed groupby + keyed groupby
    +  @Test
    +  def testWordCount(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +    env.setParallelism(1)
    +    env.setStateBackend(getStateBackend)
    +
    +    val stream = env.fromCollection(data)
    +    val table = stream.toTable(tEnv, 'word, 'num)
    +    val resultTable = table
    +      .groupBy('word)
    +      .select('word as 'word, 'num.sum as 'count)
    +      .groupBy('count)
    +      .select('count, 'word.count as 'frequency)
    +
    +    val results = resultTable.toDataStream[Row]
    +    results.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = Seq("1,1", "1,2", "1,1", "2,1", "1,2")
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  // keyed groupby + non-keyed groupby
    +  @Test
    +  def testGroupByAndNonKeyedGroupBy(): Unit = {
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +    env.setParallelism(1)
    +    env.setStateBackend(getStateBackend)
    +
    +    val stream = env.fromCollection(data)
    +    val table = stream.toTable(tEnv, 'word, 'num)
    +    val resultTable = table
    +      .groupBy('word)
    +      .select('word as 'word, 'num.sum as 'count)
    +      .select('count.sum)
    +
    +    val results = resultTable.toDataStream[Row]
    +    results.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = Seq("1", "2", "1", "3", "4")
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  // non-keyed groupby + keyed groupby
    +  @Test
    +  def testNonKeyedGroupByAndGroupBy(): Unit = {
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +    env.setParallelism(1)
    +    env.setStateBackend(getStateBackend)
    +
    +    val stream = env.fromCollection(data)
    +    val table = stream.toTable(tEnv, 'word, 'num)
    +    val resultTable = table
    +      .select('num.sum as 'count)
    +      .groupBy('count)
    +      .select('count, 'count.count)
    +
    +    val results = resultTable.toDataStream[Row]
    +    results.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = Seq("1,1", "1,0", "2,1", "2,0", "3,1", "3,0", "4,1")
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  // keyed groupby + over agg(unbounded, procTime, keyed)
    +  @Test
    +  def testGroupByAndUnboundPartitionedProcessingWindowWithRow(): Unit = {
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    env.setStateBackend(getStateBackend)
    +    env.setParallelism(1)
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +
    +    StreamITCase.testResults = mutable.MutableList()
    +
    +    val t1 = env.fromCollection(data).toTable(tEnv).as('word, 'number)
    +
    +    tEnv.registerTable("T1", t1)
    +
    +    val sqlQuery = "SELECT word, cnt, count(word) " +
    +      "OVER (PARTITION BY cnt ORDER BY ProcTime() " +
    +      "ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" +
    +      "FROM " +
    +      "(SELECT word, count(number) as cnt from T1 group by word) "
    +
    +    val result = tEnv.sql(sqlQuery).toDataStream[Row]
    +    result.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = mutable.MutableList("Hello,1,1", "word,1,2", "Hello,2,1", "bark,1,2")
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  // keyed groupby + over agg(unbounded, procTime, non-keyed)
    +  @Test
    +  def testGroupByAndUnboundNonPartitionedProcessingWindowWithRow(): Unit = {
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    env.setStateBackend(getStateBackend)
    +    env.setParallelism(1)
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +
    +    StreamITCase.testResults = mutable.MutableList()
    +
    +    val t1 = env.fromCollection(data).toTable(tEnv).as('word, 'number)
    +
    +    tEnv.registerTable("T1", t1)
    +
    +    val sqlQuery = "SELECT word, cnt, count(word) " +
    +      "OVER (ORDER BY ProcTime() ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" +
    +      "FROM (SELECT word , count(number) as cnt from T1 group by word) "
    +
    +    val result = tEnv.sql(sqlQuery).toDataStream[Row]
    +    result.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = mutable.MutableList("Hello,1,1", "word,1,2", "Hello,2,2", "bark,1,3")
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  // test unique process, if the current output message of unbounded groupby equals the
    +  // previous message, unbounded groupby will ignore the current one.
    +  @Test
    +  def testUniqueProcess(): Unit = {
    +    // data input
    +    val data = List(
    +      (1234, 2L),
    --- End diff --
    
    Please use more test data


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by lincoln-lil <gi...@git.apache.org>.
Github user lincoln-lil commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r111958225
  
    --- Diff: flink-core/src/main/java/org/apache/flink/types/Row.java ---
    @@ -46,6 +46,9 @@
     	/** The array to store actual values. */
     	private final Object[] fields;
     
    +	/** Indicate to add or delete this row */
    +	public Command command = Command.Add;
    --- End diff --
    
    I think change to private and offer getter/setter will be more better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by lincoln-lil <gi...@git.apache.org>.
Github user lincoln-lil commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r111957949
  
    --- Diff: flink-core/src/main/java/org/apache/flink/types/Command.java ---
    @@ -0,0 +1,29 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.types;
    +
    +import java.io.Serializable;
    +
    +/**
    + * A Command is used in a {@link Row} to distinguish delete or add.
    + * Delete indicate a retracion row and Add means a nomal row.
    --- End diff --
    
    retracion -> retraction


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r112080186
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala ---
    @@ -84,16 +109,38 @@ class GroupAggProcessFunction(
         }
     
         // Set aggregate result to the final output
    -    i = 0
    -    while (i < aggregates.length) {
    -      val index = groupings.length + i
    -      val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
    -      aggregates(i).accumulate(accumulator, input.getField(aggFields(i)(0)))
    -      output.setField(index, aggregates(i).getValue(accumulator))
    -      i += 1
    +    if (input.command == Command.Delete) {
    +      i = 0
    +      while (i < aggregates.length) {
    +        val index = groupings.length + i
    +        val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
    +        aggregates(i).retract(accumulator, input.getField(aggFields(i)(0)))
    +        output.setField(index, aggregates(i).getValue(accumulator))
    +        i += 1
    +      }
    +    } else {
    +      i = 0
    +      while (i < aggregates.length) {
    +        val index = groupings.length + i
    +        val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
    +        aggregates(i).accumulate(accumulator, input.getField(aggFields(i)(0)))
    +        output.setField(index, aggregates(i).getValue(accumulator))
    +        i += 1
    +      }
         }
    -    state.update(accumulators)
     
    +    // if previous is not null, do retraction process
    +    if (null != previous) {
    +      if (previous.equals(output)) {
    +        // ignore same output
    +        return
    --- End diff --
    
    We still need to update the state even if we do not emit new values. If we have a max aggregation with an accumulator that holds a map of `10->1, 5->2` and we add `8`. The new accumulator will be `10->1, 8->1, 5->2` but the aggregation result will still be 10. If we later retract `10`, the new max would be `5` but should be `8`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r112081928
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala ---
    @@ -0,0 +1,310 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.api.scala.stream
    +
    +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    +import org.apache.flink.table.api.{TableEnvironment, TableException}
    +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamingWithStateTestBase}
    +import org.apache.flink.types.Row
    +import org.junit.Assert._
    +import org.junit.Test
    +import org.apache.flink.table.api.scala._
    +import org.apache.flink.api.scala._
    +import org.apache.flink.streaming.api.TimeCharacteristic
    +import org.apache.flink.table.utils.TableFunc0
    +
    +import scala.collection.mutable
    +
    +/**
    +  * tests for retraction
    +  */
    +class RetractionITCase extends StreamingWithStateTestBase {
    +  // input data
    +  val data = List(
    +    ("Hello", 1),
    +    ("word", 1),
    +    ("Hello", 1),
    +    ("bark", 1)
    +  )
    +
    +  // keyed groupby + keyed groupby
    +  @Test
    +  def testWordCount(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +    env.setParallelism(1)
    +    env.setStateBackend(getStateBackend)
    +
    +    val stream = env.fromCollection(data)
    +    val table = stream.toTable(tEnv, 'word, 'num)
    +    val resultTable = table
    +      .groupBy('word)
    +      .select('word as 'word, 'num.sum as 'count)
    +      .groupBy('count)
    +      .select('count, 'word.count as 'frequency)
    +
    +    val results = resultTable.toDataStream[Row]
    +    results.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = Seq("1,1", "1,2", "1,1", "2,1", "1,2")
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  // keyed groupby + non-keyed groupby
    +  @Test
    +  def testGroupByAndNonKeyedGroupBy(): Unit = {
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +    env.setParallelism(1)
    +    env.setStateBackend(getStateBackend)
    +
    +    val stream = env.fromCollection(data)
    +    val table = stream.toTable(tEnv, 'word, 'num)
    +    val resultTable = table
    +      .groupBy('word)
    +      .select('word as 'word, 'num.sum as 'count)
    +      .select('count.sum)
    +
    +    val results = resultTable.toDataStream[Row]
    +    results.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = Seq("1", "2", "1", "3", "4")
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  // non-keyed groupby + keyed groupby
    +  @Test
    +  def testNonKeyedGroupByAndGroupBy(): Unit = {
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +    env.setParallelism(1)
    +    env.setStateBackend(getStateBackend)
    +
    +    val stream = env.fromCollection(data)
    +    val table = stream.toTable(tEnv, 'word, 'num)
    +    val resultTable = table
    +      .select('num.sum as 'count)
    +      .groupBy('count)
    +      .select('count, 'count.count)
    +
    +    val results = resultTable.toDataStream[Row]
    +    results.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = Seq("1,1", "1,0", "2,1", "2,0", "3,1", "3,0", "4,1")
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  // keyed groupby + over agg(unbounded, procTime, keyed)
    +  @Test
    +  def testGroupByAndUnboundPartitionedProcessingWindowWithRow(): Unit = {
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    env.setStateBackend(getStateBackend)
    +    env.setParallelism(1)
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +
    +    StreamITCase.testResults = mutable.MutableList()
    +
    +    val t1 = env.fromCollection(data).toTable(tEnv).as('word, 'number)
    +
    +    tEnv.registerTable("T1", t1)
    +
    +    val sqlQuery = "SELECT word, cnt, count(word) " +
    +      "OVER (PARTITION BY cnt ORDER BY ProcTime() " +
    +      "ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" +
    +      "FROM " +
    +      "(SELECT word, count(number) as cnt from T1 group by word) "
    +
    +    val result = tEnv.sql(sqlQuery).toDataStream[Row]
    +    result.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = mutable.MutableList("Hello,1,1", "word,1,2", "Hello,2,1", "bark,1,2")
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  // keyed groupby + over agg(unbounded, procTime, non-keyed)
    +  @Test
    +  def testGroupByAndUnboundNonPartitionedProcessingWindowWithRow(): Unit = {
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    env.setStateBackend(getStateBackend)
    +    env.setParallelism(1)
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +
    +    StreamITCase.testResults = mutable.MutableList()
    +
    +    val t1 = env.fromCollection(data).toTable(tEnv).as('word, 'number)
    +
    +    tEnv.registerTable("T1", t1)
    +
    +    val sqlQuery = "SELECT word, cnt, count(word) " +
    +      "OVER (ORDER BY ProcTime() ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" +
    +      "FROM (SELECT word , count(number) as cnt from T1 group by word) "
    +
    +    val result = tEnv.sql(sqlQuery).toDataStream[Row]
    +    result.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = mutable.MutableList("Hello,1,1", "word,1,2", "Hello,2,2", "bark,1,3")
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  // test unique process, if the current output message of unbounded groupby equals the
    +  // previous message, unbounded groupby will ignore the current one.
    +  @Test
    +  def testUniqueProcess(): Unit = {
    +    // data input
    +    val data = List(
    +      (1234, 2L),
    +      (1234, 0L)
    +    )
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +    env.setParallelism(1)
    +    env.setStateBackend(getStateBackend)
    +
    +    val stream = env.fromCollection(data)
    +    val table = stream.toTable(tEnv, 'pk, 'value)
    +    val resultTable = table
    +      .groupBy('pk)
    +      .select('pk as 'pk, 'value.sum as 'sum)
    +      .groupBy('sum)
    +      .select('sum, 'pk.count as 'count)
    +
    +    val results = resultTable.toDataStream[Row]
    +    results.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = Seq("2,1")
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  // correlate should handle retraction messages correctly
    +  @Test
    +  def testCorrelate(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +    env.setParallelism(1)
    +    env.setStateBackend(getStateBackend)
    +
    +    val func0 = new TableFunc0
    +
    +    val stream = env.fromCollection(data)
    +    val table = stream.toTable(tEnv, 'word, 'num)
    +    val resultTable = table
    +      .groupBy('word)
    +      .select('word as 'word, 'num.sum as 'count)
    +      .leftOuterJoin(func0('word))
    +      .groupBy('count)
    +      .select('count, 'word.count as 'frequency)
    +
    +    val results = resultTable.toDataStream[Row]
    +    results.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = Seq("1,1", "1,2", "1,1", "2,1", "1,2")
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  // groupby + window agg
    +  @Test(expected = classOf[TableException])
    +  def testGroupByAndProcessingTimeSlidingGroupWindow(): Unit = {
    --- End diff --
    
    Please check the tests with expected failures to a test that extends `TableTestBase` such as `GroupAggregationsTest`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by hequn8128 <gi...@git.apache.org>.
Github user hequn8128 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r112818000
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala ---
    @@ -84,16 +109,38 @@ class GroupAggProcessFunction(
         }
     
         // Set aggregate result to the final output
    -    i = 0
    -    while (i < aggregates.length) {
    -      val index = groupings.length + i
    -      val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
    -      aggregates(i).accumulate(accumulator, input.getField(aggFields(i)(0)))
    -      output.setField(index, aggregates(i).getValue(accumulator))
    -      i += 1
    +    if (input.command == Command.Delete) {
    +      i = 0
    +      while (i < aggregates.length) {
    +        val index = groupings.length + i
    +        val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
    +        aggregates(i).retract(accumulator, input.getField(aggFields(i)(0)))
    +        output.setField(index, aggregates(i).getValue(accumulator))
    +        i += 1
    +      }
    +    } else {
    +      i = 0
    +      while (i < aggregates.length) {
    +        val index = groupings.length + i
    +        val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
    +        aggregates(i).accumulate(accumulator, input.getField(aggFields(i)(0)))
    +        output.setField(index, aggregates(i).getValue(accumulator))
    +        i += 1
    +      }
         }
    -    state.update(accumulators)
     
    +    // if previous is not null, do retraction process
    +    if (null != previous) {
    +      if (previous.equals(output)) {
    +        // ignore same output
    +        return
    --- End diff --
    
    yes, you are right, we should update the state even if we do not emit new values. thx


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by lincoln-lil <gi...@git.apache.org>.
Github user lincoln-lil commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r111957849
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java ---
    @@ -198,6 +209,7 @@ public Row deserialize(Row reuse, DataInputView source) throws IOException {
     				}
     			}
     		}
    +		reuse.command = (Command) commandSerializer.deserialize(source);
    --- End diff --
    
    same with L184


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r113048361
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -118,7 +130,7 @@ object AggregateUtil {
           } else {
             new ProcTimeUnboundedNonPartitionedOver(
               genFunction,
    -          aggregationStateType)
    +          new CRowTypeInfo(aggregationStateType))
    --- End diff --
    
    we can keep `aggregationStateType` if we don't touch the code generation


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r112080959
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala ---
    @@ -0,0 +1,310 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.api.scala.stream
    +
    +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    +import org.apache.flink.table.api.{TableEnvironment, TableException}
    +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamingWithStateTestBase}
    +import org.apache.flink.types.Row
    +import org.junit.Assert._
    +import org.junit.Test
    +import org.apache.flink.table.api.scala._
    +import org.apache.flink.api.scala._
    +import org.apache.flink.streaming.api.TimeCharacteristic
    +import org.apache.flink.table.utils.TableFunc0
    +
    +import scala.collection.mutable
    +
    +/**
    +  * tests for retraction
    +  */
    +class RetractionITCase extends StreamingWithStateTestBase {
    +  // input data
    +  val data = List(
    --- End diff --
    
    Please use more test data.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r113049726
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala ---
    @@ -47,12 +48,12 @@ class ProcTimeBoundedRangeOver(
         genAggregations: GeneratedAggregationsFunction,
         precedingTimeBoundary: Long,
         aggregatesTypeInfo: RowTypeInfo,
    -    inputType: TypeInformation[Row])
    -  extends ProcessFunction[Row, Row]
    +    inputType: TypeInformation[CRow])
    +  extends ProcessFunction[CRow, CRow]
         with Compiler[GeneratedAggregations] {
    -  private var output: Row = _
    +  private var output: CRow = _
       private var accumulatorState: ValueState[Row] = _
    -  private var rowMapState: MapState[Long, JList[Row]] = _
    --- End diff --
    
    I think we should keep the state as `Row`. At the moment we do not need the `command` field.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r113048689
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -179,7 +198,9 @@ object AggregateUtil {
             needRetraction = true)
     
         val aggregationStateType: RowTypeInfo = createAccumulatorRowType(aggregates)
    -    val inputRowType = FlinkTypeFactory.toInternalRowTypeInfo(inputType).asInstanceOf[RowTypeInfo]
    +    val inputRowType = FlinkTypeFactory
    --- End diff --
    
    change to `CRowTypeInfo(FlinkTypeFactory.toInternalRowType(inputType)))`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r112902344
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowTypeInfo.scala ---
    @@ -0,0 +1,86 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.types
    +
    +import java.util
    +
    +import org.apache.flink.api.common.ExecutionConfig
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.common.typeutils.{CompositeType, TypeComparator, TypeSerializer}
    +import org.apache.flink.api.common.typeutils.CompositeType.{FlatFieldDescriptor, TypeComparatorBuilder}
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +
    +class CRowTypeInfo(val rowType: RowTypeInfo) extends CompositeType[CRow](classOf[CRow]) {
    +
    +  override def getFieldNames: Array[String] = rowType.getFieldNames
    +
    +  override def getFieldIndex(fieldName: String): Int = rowType.getFieldIndex(fieldName)
    +
    +  override def getTypeAt[X](fieldExpression: String): TypeInformation[X] =
    +    rowType.getTypeAt(fieldExpression)
    +
    +  override def getTypeAt[X](pos: Int): TypeInformation[X] =
    +    rowType.getTypeAt(pos)
    +
    +  override def getFlatFields(
    +      fieldExpression: String,
    +      offset: Int,
    +      result: util.List[FlatFieldDescriptor]): Unit =
    +    rowType.getFlatFields(fieldExpression, offset, result)
    +
    +  override def isBasicType: Boolean = rowType.isBasicType
    +
    +  override def isTupleType: Boolean = rowType.isTupleType
    +
    +  override def getArity: Int = rowType.getArity
    +
    +  override def getTotalFields: Int = rowType.getTotalFields
    +
    +  override def createSerializer(config: ExecutionConfig): TypeSerializer[CRow] =
    +    new CRowSerializer(rowType.createSerializer(config))
    +
    +  // not implemented because we override createComparator
    +  override protected def createTypeComparatorBuilder(): TypeComparatorBuilder[CRow] = null
    +
    +  override def createComparator(
    +      logicalKeyFields: Array[Int],
    +      orders: Array[Boolean],
    +      logicalFieldOffset: Int,
    +      config: ExecutionConfig): TypeComparator[CRow] = {
    +
    +    val rowComparator = rowType.createComparator(
    +      logicalKeyFields,
    +      orders,
    +      logicalFieldOffset,
    +      config)
    +
    +    new CRowComparator(rowComparator)
    +  }
    +
    +  override def equals(obj: scala.Any): Boolean = {
    +    if (this.canEqual(obj)) {
    +      rowType.equals(obj.asInstanceOf[CRowTypeInfo].rowType)
    +    } else {
    +      false
    +    }
    +  }
    +
    +  override def canEqual(obj: scala.Any): Boolean = obj.isInstanceOf[CRowTypeInfo]
    +}
    +
    --- End diff --
    
    Add 
    
    ```
    object CRowTypeInfo {
      
      def apply(rowType: TypeInformation[Row]): CRowTypeInfo = {
        rowType match {
          case r: RowTypeInfo => new CRowTypeInfo(r)
        }
      }
      
    }
    ```
    
    for easy creation of `CRowTypeInfo`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r113043078
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ---
    @@ -627,24 +640,27 @@ abstract class TableEnvironment(val config: TableConfig) {
         * @param functionName name of the map function. Must not be unique but has to be a
         *                     valid Java class identifier.
         */
    -  protected def sinkConversion[T](
    -      physicalRowTypeInfo: TypeInformation[Row],
    +  protected def sinkConversion[T, P](
    --- End diff --
    
    I would refactor this as follows:
    - make `sinkConversion()` an abstract method that is implemented by `BatchTableEnvironemnt` and `StreamTableEnvironment`
    - extract the code-generation part that converts a `Row` into the requested data type and keep this in a method `TableEnvironment.generateRowConverterFunction()`
    - the implementations of `sinkConversion()` can use the `generateRowConverterFunction`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3733: [FLINK-6091] [table] Implement and turn on retract...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r113049930
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala ---
    @@ -69,10 +70,10 @@ class ProcTimeBoundedRangeOver(
         output = function.createOutputRow()
    --- End diff --
    
    we can init `output` as `output = new CRow(function.createOutputRow(), true)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---