You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by twalthr <gi...@git.apache.org> on 2017/02/20 15:41:55 UTC

[GitHub] flink pull request #3364: [FLINK-5047] [table] Add sliding group-windows for...

GitHub user twalthr opened a pull request:

    https://github.com/apache/flink/pull/3364

    [FLINK-5047] [table] Add sliding group-windows for batch tables

    This PR implements sliding group-windows. It covers the following cases:
    
    - Grouped Slide count-windows with incremental aggregates
    - Grouped and All Slide time-windows with incremental and non-incremental aggregates
    
    All windows support the overlapping and non-overlapping case. Slide windows are pre-tumbled if possible. This PR also fixes some bugs. 
    
    If the general design is ok, I will also implement the missing non-incremental aggregates for count-windows. And add a bit of documentation.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/twalthr/flink FLINK-5047

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3364.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3364
    
----
commit adcedd91e76e73457740816f691ccf64f2e2e38b
Author: twalthr <tw...@apache.org>
Date:   2017-01-18T15:56:02Z

    [FLINK-5047] [table] Add sliding group-windows for batch tables

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3364: [FLINK-5047] [table] Add sliding group-windows for batch ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/3364
  
    @fhueske thanks for your comments. I will create a time window PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3364: [FLINK-5047] [table] Add sliding group-windows for batch ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/3364
  
    Thanks for the feedback @fhueske. I updated the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3364: [FLINK-5047] [table] Add sliding group-windows for...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3364#discussion_r104515406
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala ---
    @@ -280,6 +285,138 @@ class DataSetWindowAggregate(
         }
       }
     
    +  private def createEventTimeSlidingWindowDataSet(
    +      inputDS: DataSet[Row],
    +      isTimeWindow: Boolean,
    +      isParserCaseSensitive: Boolean)
    +    : DataSet[Row] = {
    +
    +    // create MapFunction for initializing the aggregations
    +    // it aligns the rowtime for pre-tumbling in case of a time-window for incremental aggregates
    +    val mapFunction = createDataSetWindowPrepareMapFunction(
    +      window,
    +      namedAggregates,
    +      grouping,
    +      inputType,
    +      isParserCaseSensitive)
    +
    +    val mappedDataSet = inputDS
    +      .map(mapFunction)
    +      .name(prepareOperatorName)
    +
    +    val mapReturnType = mappedDataSet.getType
    +
    +    val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
    +    val groupingKeys = grouping.indices.toArray
    +
    +    // do incremental aggregation if possible
    +    val isIncremental = doAllSupportPartialAggregation(
    +      namedAggregates.map(_.getKey),
    +      inputType,
    +      grouping.length)
    +
    +    val preparedDataSet = if (isTimeWindow) {
    +      // time window
    +
    +      if (isIncremental) {
    +        // incremental aggregates
    +
    +        val groupingKeysAndAlignedRowtime = groupingKeys :+ mapReturnType.getArity - 1
    +
    +        // create GroupReduceFunction
    +        // for pre-tumbling and replicating/omitting the content for each pane
    +        val prepareReduceFunction = createDataSetSlideWindowPrepareGroupReduceFunction(
    +          window,
    +          namedAggregates,
    +          grouping,
    +          inputType,
    +          isParserCaseSensitive)
    +
    +        mappedDataSet.asInstanceOf[DataSet[Row]]
    +          .groupBy(groupingKeysAndAlignedRowtime: _*)
    +          .reduceGroup(prepareReduceFunction) // pre-tumbles and replicates/omits
    +          .name(prepareOperatorName)
    +      } else {
    +        // non-incremental aggregates
    +
    +        // create FlatMapFunction
    +        // for replicating/omitting the content for each pane
    +        val prepareFlatMapFunction = createDataSetSlideWindowPrepareFlatMapFunction(
    +          window,
    +          namedAggregates,
    +          grouping,
    +          inputType,
    +          isParserCaseSensitive)
    +
    +        mappedDataSet
    +          .flatMap(prepareFlatMapFunction) // replicates/omits
    +      }
    +    } else {
    +      // count window
    --- End diff --
    
    Can we break this PR into time and count windows? There are so many cases to consider...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3364: [FLINK-5047] [table] Add sliding group-windows for...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3364#discussion_r104522529
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala ---
    @@ -0,0 +1,93 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import org.apache.flink.api.common.functions.RichFlatMapFunction
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ResultTypeQueryable
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{Collector, Preconditions}
    +
    +
    +/**
    +  * It is used for sliding windows on batch for time-windows. It takes a prepared input row,
    +  * aligns the window start, and replicates or omits records for different panes of a sliding
    +  * window. It is used for non-incremental aggregations.
    +  *
    +  * @param aggregates aggregate functions
    +  * @param groupingKeysLength number of grouping keys
    +  * @param windowSize window size of the sliding window
    +  * @param windowSlide window slide of the sliding window
    +  * @param returnType return type of this function
    +  */
    +class DataSetSlideTimeWindowAggFlatMapFunction(
    +    private val aggregates: Array[Aggregate[_]],
    +    private val groupingKeysLength: Int,
    +    private val timeFieldPos: Int,
    +    private val windowSize: Long,
    +    private val windowSlide: Long,
    +    @transient private val returnType: TypeInformation[Row])
    +  extends RichFlatMapFunction[Row, Row]
    +  with ResultTypeQueryable[Row] {
    +
    +  private var aggregateBuffer: Row = _
    +  private var outWindowStartIndex: Int = _
    +
    +  override def open(config: Configuration) {
    +    Preconditions.checkNotNull(aggregates)
    --- End diff --
    
    Move everything to the constructor and remove `open()`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3364: [FLINK-5047] [table] Add sliding group-windows for...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3364#discussion_r104507420
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -153,6 +170,156 @@ object AggregateUtil {
       }
     
       /**
    +    * Create a [[org.apache.flink.api.common.functions.GroupReduceFunction]] that prepares for
    +    * incremental aggregates of sliding windows (time and count-windows).
    +    * It requires a prepared input (with intermediate aggregate fields and aligned rowtime for
    +    * pre-tumbling in case of time-windows), pre-aggregates (pre-tumbles) rows, aligns the
    +    * window-start, and replicates or omits records for different panes of a sliding window.
    +    *
    +    * The output of the function contains the grouping keys, the intermediate aggregate values of
    +    * all aggregate function and the aligned window start. Window start must not be a timestamp,
    +    * but can also be a count value for count-windows.
    +    *
    +    * The output is stored in Row by the following format:
    +    *
    +    * {{{
    +    *                      avg(x) aggOffsetInRow = 2      count(z) aggOffsetInRow = 5
    +    *                            |                          |
    +    *                            v                          v
    +    *        +---------+---------+--------+--------+--------+--------+-------------+
    +    *        |groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 | windowStart |
    +    *        +---------+---------+--------+--------+--------+--------+-------------+
    +    *                                              ^                 ^
    +    *                                              |                 |
    +    *                                 sum(y) aggOffsetInRow = 4    window start for pane mapping
    +    * }}}
    +    *
    +    * NOTE: this function is only used for sliding windows on batch tables.
    +    */
    +  def createDataSetSlideWindowPrepareGroupReduceFunction(
    +      window: LogicalWindow,
    +      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
    +      groupings: Array[Int],
    +      inputType: RelDataType,
    +      isParserCaseSensitive: Boolean)
    +    : RichGroupReduceFunction[Row, Row] = {
    +
    +    val aggregates = transformToAggregateFunctions(
    +      namedAggregates.map(_.getKey),
    +      inputType,
    +      groupings.length)._2
    +
    +    val returnType: RowTypeInfo = createAggregateBufferDataType(
    +      groupings,
    +      aggregates,
    +      inputType,
    +      Some(Array(BasicTypeInfo.LONG_TYPE_INFO)))
    +
    +    window match {
    +      case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) =>
    +        // sliding time-window
    +        if (aggregates.forall(_.supportPartial)) {
    +          // for incremental aggregations
    +          new DataSetSlideTimeWindowAggReduceCombineFunction(
    +            aggregates,
    +            groupings.length,
    +            returnType.getArity - 1,
    +            asLong(size),
    +            asLong(slide),
    +            returnType)
    +        } else {
    +          // for non-incremental aggregations
    --- End diff --
    
    Do we need this case? Without partial aggregation, we cannot use tumbling windows to compute sliding windows. So this method is only called if all aggregates support partial aggregation, right? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3364: [FLINK-5047] [table] Add sliding group-windows for...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3364#discussion_r104525345
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala ---
    @@ -360,6 +360,19 @@ class GroupWindowTest extends TableTestBase {
           .window(Session withGap 7.milli as 'w) // require on a time attribute
           .groupBy('string, 'w)
           .select('string, 'int.count)
    +
    +    val expected = unaryNode(
    --- End diff --
    
    Why do you add the expected result to a test which is expected to fail?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3364: [FLINK-5047] [table] Add sliding group-windows for...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/3364


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3364: [FLINK-5047] [table] Add sliding group-windows for...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3364#discussion_r104514985
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideCountWindowAggReduceGroupFunction.scala ---
    @@ -0,0 +1,113 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.lang.Iterable
    +import java.sql.Timestamp
    +
    +import org.apache.calcite.runtime.SqlFunctions
    +import org.apache.flink.api.common.functions.RichGroupReduceFunction
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ResultTypeQueryable
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{Collector, Preconditions}
    +
    +/**
    +  * It is used for sliding windows on batch for count-windows. It takes a prepared input row,
    +  * pre-aggregates (pre-tumbles) rows, aligns the window start, and replicates or omits records
    +  * for different panes of a sliding window.
    +  *
    +  * @param aggregates aggregate functions
    +  * @param groupingKeysLength number of grouping keys
    +  * @param preTumblingSize number of records to be aggregated (tumbled) before emission
    +  * @param windowSize window size of the sliding window
    +  * @param windowSlide window slide of the sliding window
    +  * @param returnType return type of this function
    +  */
    +class DataSetSlideCountWindowAggReduceGroupFunction(
    --- End diff --
    
    I think it makes sense to add forward field annotations for the internal operators.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3364: [FLINK-5047] [table] Add sliding group-windows for...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3364#discussion_r104511932
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala ---
    @@ -0,0 +1,93 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import org.apache.flink.api.common.functions.RichFlatMapFunction
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ResultTypeQueryable
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{Collector, Preconditions}
    +
    +
    +/**
    +  * It is used for sliding windows on batch for time-windows. It takes a prepared input row,
    +  * aligns the window start, and replicates or omits records for different panes of a sliding
    +  * window. It is used for non-incremental aggregations.
    +  *
    +  * @param aggregates aggregate functions
    +  * @param groupingKeysLength number of grouping keys
    +  * @param windowSize window size of the sliding window
    +  * @param windowSlide window slide of the sliding window
    +  * @param returnType return type of this function
    +  */
    +class DataSetSlideTimeWindowAggFlatMapFunction(
    +    private val aggregates: Array[Aggregate[_]],
    +    private val groupingKeysLength: Int,
    +    private val timeFieldPos: Int,
    +    private val windowSize: Long,
    +    private val windowSlide: Long,
    +    @transient private val returnType: TypeInformation[Row])
    +  extends RichFlatMapFunction[Row, Row]
    +  with ResultTypeQueryable[Row] {
    +
    +  private var aggregateBuffer: Row = _
    +  private var outWindowStartIndex: Int = _
    +
    +  override def open(config: Configuration) {
    +    Preconditions.checkNotNull(aggregates)
    +    // add one field to store window start
    +    val partialRowLength = groupingKeysLength +
    +      aggregates.map(_.intermediateDataType.length).sum + 1
    +    aggregateBuffer = new Row(partialRowLength)
    +    outWindowStartIndex = partialRowLength - 1
    +  }
    +
    +  override def flatMap(record: Row, out: Collector[Row]): Unit = {
    +    val windowStart = record.getField(timeFieldPos).asInstanceOf[Long]
    +
    +    // adopted from SlidingEventTimeWindows.assignWindows
    +    var start: Long = TimeWindow.getWindowStartWithOffset(windowStart, 0, windowSlide)
    +
    +    // skip preparing output if it is not necessary
    +    if (start > windowStart - windowSize) {
    +
    +      // prepare output
    +      for (i <- aggregates.indices) {
    --- End diff --
    
    Isn't this just copying data from record to the aggregateBuffer? 
    Doesn't the input record have the same schema as the output record? Isn't is sufficient emit the input record multiple times with adapted `outWindowStartIndex`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3364: [FLINK-5047] [table] Add sliding group-windows for...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3364#discussion_r104926612
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceCombineFunction.scala ---
    @@ -0,0 +1,110 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.lang.Iterable
    +
    +import org.apache.flink.api.common.functions.CombineFunction
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +import org.apache.flink.types.Row
    +
    +/**
    +  * Wraps the aggregate logic inside of
    +  * [[org.apache.flink.api.java.operators.GroupReduceOperator]] and
    +  * [[org.apache.flink.api.java.operators.GroupCombineOperator]].
    +  *
    +  * It is used for sliding on batch for time-windows.
    +  *
    +  * @param aggregates aggregate functions
    +  * @param groupingKeysLength number of grouping keys
    +  * @param timeFieldPos position of aligned time field
    +  * @param windowSize window size of the sliding window
    +  * @param windowSlide window slide of the sliding window
    +  * @param returnType return type of this function
    +  */
    +class DataSetSlideTimeWindowAggReduceCombineFunction(
    --- End diff --
    
    Can be merged with `DataSetSlideTimeWindowAggReduceGroupFunction` because both can only be used if input is combinable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3364: [FLINK-5047] [table] Add sliding group-windows for...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3364#discussion_r104722597
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala ---
    @@ -146,42 +146,9 @@ class AggregationsITCase extends StreamingMultipleProgramsTestBase {
           "Hi,1,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
         assertEquals(expected.sorted, StreamITCase.testResults.sorted)
       }
    -
    -  @Test
    -  def testEventTimeSlidingWindow(): Unit = {
    --- End diff --
    
    Yes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3364: [FLINK-5047] [table] Add sliding group-windows for...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3364#discussion_r104919295
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala ---
    @@ -312,6 +320,108 @@ class DataSetWindowAggregate(
         }
       }
     
    +  private def createEventTimeSlidingWindowDataSet(
    +      inputDS: DataSet[Row],
    +      isTimeWindow: Boolean,
    +      size: Long,
    +      slide: Long,
    +      isParserCaseSensitive: Boolean)
    +    : DataSet[Row] = {
    +
    +    // create MapFunction for initializing the aggregations
    +    // it aligns the rowtime for pre-tumbling in case of a time-window for partial aggregates
    +    val mapFunction = createDataSetWindowPrepareMapFunction(
    +      window,
    +      namedAggregates,
    +      grouping,
    +      inputType,
    +      isParserCaseSensitive)
    +
    +    val mappedDataSet = inputDS
    +      .map(mapFunction)
    +      .name(prepareOperatorName)
    +
    +    val mapReturnType = mappedDataSet.getType
    +
    +    val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
    +    val groupingKeys = grouping.indices.toArray
    +
    +    // do partial aggregation if possible
    +    val isPartial = doAllSupportPartialMerge(
    +      namedAggregates.map(_.getKey),
    +      inputType,
    +      grouping.length)
    +
    +    // only pre-tumble if it is worth it
    +    val littleTumblingSize = determineLargestTumblingSize(size, slide) <= 1
    --- End diff --
    
    `isLittleTumblingSize`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3364: [FLINK-5047] [table] Add sliding group-windows for...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3364#discussion_r104523932
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala ---
    @@ -0,0 +1,110 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.lang.Iterable
    +
    +import org.apache.flink.api.common.functions.RichGroupReduceFunction
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ResultTypeQueryable
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{Collector, Preconditions}
    +
    +/**
    +  * It is used for sliding windows on batch for time-windows. It takes a prepared input row (with
    +  * aligned rowtime for pre-tumbling), pre-aggregates (pre-tumbles) rows, aligns the window start,
    +  * and replicates or omits records for different panes of a sliding window.
    +  *
    +  * This function is similar to [[DataSetTumbleCountWindowAggReduceGroupFunction]], however,
    +  * it does no final aggregate evaluation. It also includes the logic of
    +  * [[DataSetSlideTimeWindowAggFlatMapFunction]].
    +  *
    +  * @param aggregates aggregate functions
    +  * @param groupingKeysLength number of grouping keys
    +  * @param timeFieldPos position of aligned time field
    +  * @param windowSize window size of the sliding window
    +  * @param windowSlide window slide of the sliding window
    +  * @param returnType return type of this function
    +  */
    +class DataSetSlideTimeWindowAggReduceGroupFunction(
    +    private val aggregates: Array[Aggregate[_]],
    +    private val groupingKeysLength: Int,
    +    private val timeFieldPos: Int,
    +    private val windowSize: Long,
    +    private val windowSlide: Long,
    +    @transient private val returnType: TypeInformation[Row])
    +  extends RichGroupReduceFunction[Row, Row]
    +  with ResultTypeQueryable[Row] {
    +
    +  protected var aggregateBuffer: Row = _
    +  private var outWindowStartIndex: Int = _
    +
    +  override def open(config: Configuration) {
    +    Preconditions.checkNotNull(aggregates)
    --- End diff --
    
    Move checks and initialization to constructor and remove `open()`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3364: [FLINK-5047] [table] Add sliding group-windows for...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3364#discussion_r104522343
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala ---
    @@ -280,6 +285,138 @@ class DataSetWindowAggregate(
         }
       }
     
    +  private def createEventTimeSlidingWindowDataSet(
    +      inputDS: DataSet[Row],
    +      isTimeWindow: Boolean,
    +      isParserCaseSensitive: Boolean)
    +    : DataSet[Row] = {
    +
    +    // create MapFunction for initializing the aggregations
    +    // it aligns the rowtime for pre-tumbling in case of a time-window for incremental aggregates
    +    val mapFunction = createDataSetWindowPrepareMapFunction(
    +      window,
    +      namedAggregates,
    +      grouping,
    +      inputType,
    +      isParserCaseSensitive)
    +
    +    val mappedDataSet = inputDS
    +      .map(mapFunction)
    +      .name(prepareOperatorName)
    +
    +    val mapReturnType = mappedDataSet.getType
    +
    +    val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
    +    val groupingKeys = grouping.indices.toArray
    +
    +    // do incremental aggregation if possible
    +    val isIncremental = doAllSupportPartialAggregation(
    +      namedAggregates.map(_.getKey),
    +      inputType,
    +      grouping.length)
    +
    +    val preparedDataSet = if (isTimeWindow) {
    +      // time window
    +
    +      if (isIncremental) {
    +        // incremental aggregates
    +
    +        val groupingKeysAndAlignedRowtime = groupingKeys :+ mapReturnType.getArity - 1
    +
    +        // create GroupReduceFunction
    +        // for pre-tumbling and replicating/omitting the content for each pane
    +        val prepareReduceFunction = createDataSetSlideWindowPrepareGroupReduceFunction(
    +          window,
    +          namedAggregates,
    +          grouping,
    +          inputType,
    +          isParserCaseSensitive)
    +
    +        mappedDataSet.asInstanceOf[DataSet[Row]]
    +          .groupBy(groupingKeysAndAlignedRowtime: _*)
    +          .reduceGroup(prepareReduceFunction) // pre-tumbles and replicates/omits
    +          .name(prepareOperatorName)
    +      } else {
    +        // non-incremental aggregates
    +
    +        // create FlatMapFunction
    +        // for replicating/omitting the content for each pane
    +        val prepareFlatMapFunction = createDataSetSlideWindowPrepareFlatMapFunction(
    +          window,
    +          namedAggregates,
    +          grouping,
    +          inputType,
    +          isParserCaseSensitive)
    +
    +        mappedDataSet
    +          .flatMap(prepareFlatMapFunction) // replicates/omits
    +      }
    +    } else {
    +      // count window
    +
    +      // grouped window
    +      if (groupingKeys.length > 0) {
    +
    +        if (isIncremental) {
    +          // incremental aggregates
    +
    +          // create GroupReduceFunction
    +          // for pre-tumbling and replicating/omitting the content for each pane
    +          val prepareReduceFunction = createDataSetSlideWindowPrepareGroupReduceFunction(
    +            window,
    +            namedAggregates,
    +            grouping,
    +            inputType,
    +            isParserCaseSensitive)
    +
    +          mappedDataSet.asInstanceOf[DataSet[Row]]
    +            .groupBy(groupingKeys: _*)
    +            // sort on time field, it's the last element in the row
    +            .sortGroup(mapReturnType.getArity - 1, Order.ASCENDING)
    +            .reduceGroup(prepareReduceFunction) // pre-tumbles and replicates/omits
    --- End diff --
    
    Only do this if the tumble size is > 1?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3364: [FLINK-5047] [table] Add sliding group-windows for...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3364#discussion_r104925916
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -186,6 +200,130 @@ object AggregateUtil {
       }
     
       /**
    +    * Create a [[org.apache.flink.api.common.functions.GroupReduceFunction]] that prepares for
    +    * partial aggregates of sliding windows (time and count-windows).
    +    * It requires a prepared input (with intermediate aggregate fields and aligned rowtime for
    +    * pre-tumbling in case of time-windows), pre-aggregates (pre-tumbles) rows, aligns the
    +    * window-start, and replicates or omits records for different panes of a sliding window.
    +    *
    +    * The output of the function contains the grouping keys, the intermediate aggregate values of
    +    * all aggregate function and the aligned window start. Window start must not be a timestamp,
    +    * but can also be a count value for count-windows.
    +    *
    +    * The output is stored in Row by the following format:
    +    *
    +    * {{{
    +    *                      avg(x) aggOffsetInRow = 2      count(z) aggOffsetInRow = 5
    +    *                            |                          |
    +    *                            v                          v
    +    *        +---------+---------+--------+--------+--------+--------+-------------+
    +    *        |groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 | windowStart |
    +    *        +---------+---------+--------+--------+--------+--------+-------------+
    +    *                                              ^                 ^
    +    *                                              |                 |
    +    *                                 sum(y) aggOffsetInRow = 4    window start for pane mapping
    +    * }}}
    +    *
    +    * NOTE: this function is only used for sliding windows with partial aggregates on batch tables.
    +    */
    +  def createDataSetSlideWindowPrepareGroupReduceFunction(
    +      window: LogicalWindow,
    +      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
    +      groupings: Array[Int],
    +      inputType: RelDataType,
    +      isParserCaseSensitive: Boolean)
    +    : RichGroupReduceFunction[Row, Row] = {
    +
    +    val aggregates = transformToAggregateFunctions(
    +      namedAggregates.map(_.getKey),
    +      inputType,
    +      needRetraction = false)._2
    +
    +    val returnType: RowTypeInfo = createDataSetAggregateBufferDataType(
    +      groupings,
    +      aggregates,
    +      inputType,
    +      Some(Array(BasicTypeInfo.LONG_TYPE_INFO)))
    +
    +    window match {
    +      case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) =>
    +        // sliding time-window
    +        // for partial aggregations
    +        new DataSetSlideTimeWindowAggReduceCombineFunction(
    +          aggregates,
    +          groupings.length,
    +          returnType.getArity - 1,
    +          asLong(size),
    +          asLong(slide),
    +          returnType)
    +
    +      case _ =>
    +        throw new UnsupportedOperationException(s"$window is currently not supported on batch.")
    +    }
    +  }
    +
    +  /**
    +    * Create a [[org.apache.flink.api.common.functions.FlatMapFunction]] that prepares for
    +    * non-incremental aggregates of sliding windows (time-windows).
    +    *
    +    * It requires a prepared input (with intermediate aggregate fields), aligns the
    +    * window-start, and replicates or omits records for different panes of a sliding window.
    +    *
    +    * The output of the function contains the grouping keys, the intermediate aggregate values of
    +    * all aggregate function and the aligned window start.
    +    *
    +    * The output is stored in Row by the following format:
    +    *
    +    * {{{
    +    *                      avg(x) aggOffsetInRow = 2      count(z) aggOffsetInRow = 5
    +    *                            |                          |
    +    *                            v                          v
    +    *        +---------+---------+--------+--------+--------+--------+-------------+
    +    *        |groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 | windowStart |
    +    *        +---------+---------+--------+--------+--------+--------+-------------+
    +    *                                              ^                 ^
    +    *                                              |                 |
    +    *                                 sum(y) aggOffsetInRow = 4      window start for pane mapping
    +    * }}}
    +    *
    +    * NOTE: this function is only used for time-based sliding windows on batch tables.
    +    */
    +  def createDataSetSlideWindowPrepareFlatMapFunction(
    +      window: LogicalWindow,
    +      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
    --- End diff --
    
    remove unnecessary parameters `namedAggregates` and `groupings`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3364: [FLINK-5047] [table] Add sliding group-windows for...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3364#discussion_r104720345
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala ---
    @@ -0,0 +1,93 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import org.apache.flink.api.common.functions.RichFlatMapFunction
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ResultTypeQueryable
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{Collector, Preconditions}
    +
    +
    +/**
    +  * It is used for sliding windows on batch for time-windows. It takes a prepared input row,
    +  * aligns the window start, and replicates or omits records for different panes of a sliding
    +  * window. It is used for non-incremental aggregations.
    +  *
    +  * @param aggregates aggregate functions
    +  * @param groupingKeysLength number of grouping keys
    +  * @param windowSize window size of the sliding window
    +  * @param windowSlide window slide of the sliding window
    +  * @param returnType return type of this function
    +  */
    +class DataSetSlideTimeWindowAggFlatMapFunction(
    +    private val aggregates: Array[Aggregate[_]],
    +    private val groupingKeysLength: Int,
    +    private val timeFieldPos: Int,
    +    private val windowSize: Long,
    +    private val windowSlide: Long,
    +    @transient private val returnType: TypeInformation[Row])
    +  extends RichFlatMapFunction[Row, Row]
    +  with ResultTypeQueryable[Row] {
    +
    +  private var aggregateBuffer: Row = _
    +  private var outWindowStartIndex: Int = _
    +
    +  override def open(config: Configuration) {
    +    Preconditions.checkNotNull(aggregates)
    --- End diff --
    
    An empty `Row` can be serialized because all fields are null and it implements `Serializable`. However, once we have a non-serializable value in the row, it will fail. 
    But I think you are right, lets keep the `Row` initialization in `open()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3364: [FLINK-5047] [table] Add sliding group-windows for...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3364#discussion_r104726234
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala ---
    @@ -0,0 +1,88 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.lang.Iterable
    +
    +import org.apache.flink.api.common.functions.CombineFunction
    +import org.apache.flink.types.Row
    +
    +/**
    +  * Wraps the aggregate logic inside of
    +  * [[org.apache.flink.api.java.operators.GroupReduceOperator]] and
    +  * [[org.apache.flink.api.java.operators.GroupCombineOperator]].
    +  *
    +  * It is used for sliding on batch for both time and count-windows.
    +  *
    +  * @param aggregates aggregate functions.
    +  * @param groupKeysMapping index mapping of group keys between intermediate aggregate Row
    +  *                         and output Row.
    +  * @param aggregateMapping index mapping between aggregate function list and aggregated value
    +  *                         index in output Row.
    +  * @param intermediateRowArity intermediate row field count
    +  * @param finalRowArity output row field count
    +  * @param finalRowWindowStartPos relative window-start position to last field of output row
    +  * @param finalRowWindowEndPos relative window-end position to last field of output row
    +  * @param windowSize size of the window, used to determine window-end for output row
    +  */
    +class DataSetSlideWindowAggReduceCombineFunction(
    +    aggregates: Array[Aggregate[_ <: Any]],
    +    groupKeysMapping: Array[(Int, Int)],
    +    aggregateMapping: Array[(Int, Int)],
    +    intermediateRowArity: Int,
    +    finalRowArity: Int,
    +    finalRowWindowStartPos: Option[Int],
    +    finalRowWindowEndPos: Option[Int],
    +    windowSize: Long)
    +  extends DataSetSlideWindowAggReduceGroupFunction(
    +    aggregates,
    +    groupKeysMapping,
    +    aggregateMapping,
    +    intermediateRowArity,
    +    finalRowArity,
    +    finalRowWindowStartPos,
    +    finalRowWindowEndPos,
    +    windowSize)
    +  with CombineFunction[Row, Row] {
    +
    +  override def combine(records: Iterable[Row]): Row = {
    +    // initiate intermediate aggregate value
    +    aggregates.foreach(_.initiate(aggregateBuffer))
    +
    +    val iterator = records.iterator()
    +    while (iterator.hasNext) {
    +      val record = iterator.next()
    +      aggregates.foreach(_.merge(record, aggregateBuffer))
    +
    +      // check if this record is the last record
    +      if (!iterator.hasNext) {
    +        // set group keys to aggregateBuffer
    +        for (i <- groupKeysMapping.indices) {
    +          aggregateBuffer.setField(i, record.getField(i))
    +        }
    +
    +        aggregateBuffer.setField(windowStartFieldPos, record.getField(windowStartFieldPos))
    +
    +        return aggregateBuffer
    +      }
    +    }
    +
    +    // this code path should never be reached as we return before the loop finishes
    +    throw new IllegalArgumentException("Group is empty. This should never happen.")
    --- End diff --
    
    OK, but how about we make that more clear with a comment like:
    
    `// This will never happen because the iterator is never null but we have to satisfy the compiler.`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3364: [FLINK-5047] [table] Add sliding group-windows for...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3364#discussion_r104518276
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala ---
    @@ -0,0 +1,88 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.lang.Iterable
    +
    +import org.apache.flink.api.common.functions.CombineFunction
    +import org.apache.flink.types.Row
    +
    +/**
    +  * Wraps the aggregate logic inside of
    +  * [[org.apache.flink.api.java.operators.GroupReduceOperator]] and
    +  * [[org.apache.flink.api.java.operators.GroupCombineOperator]].
    +  *
    +  * It is used for sliding on batch for both time and count-windows.
    +  *
    +  * @param aggregates aggregate functions.
    +  * @param groupKeysMapping index mapping of group keys between intermediate aggregate Row
    +  *                         and output Row.
    +  * @param aggregateMapping index mapping between aggregate function list and aggregated value
    +  *                         index in output Row.
    +  * @param intermediateRowArity intermediate row field count
    +  * @param finalRowArity output row field count
    +  * @param finalRowWindowStartPos relative window-start position to last field of output row
    +  * @param finalRowWindowEndPos relative window-end position to last field of output row
    +  * @param windowSize size of the window, used to determine window-end for output row
    +  */
    +class DataSetSlideWindowAggReduceCombineFunction(
    +    aggregates: Array[Aggregate[_ <: Any]],
    +    groupKeysMapping: Array[(Int, Int)],
    +    aggregateMapping: Array[(Int, Int)],
    +    intermediateRowArity: Int,
    +    finalRowArity: Int,
    +    finalRowWindowStartPos: Option[Int],
    +    finalRowWindowEndPos: Option[Int],
    +    windowSize: Long)
    +  extends DataSetSlideWindowAggReduceGroupFunction(
    +    aggregates,
    +    groupKeysMapping,
    +    aggregateMapping,
    +    intermediateRowArity,
    +    finalRowArity,
    +    finalRowWindowStartPos,
    +    finalRowWindowEndPos,
    +    windowSize)
    +  with CombineFunction[Row, Row] {
    +
    +  override def combine(records: Iterable[Row]): Row = {
    +    // initiate intermediate aggregate value
    +    aggregates.foreach(_.initiate(aggregateBuffer))
    +
    +    val iterator = records.iterator()
    +    while (iterator.hasNext) {
    +      val record = iterator.next()
    +      aggregates.foreach(_.merge(record, aggregateBuffer))
    +
    +      // check if this record is the last record
    +      if (!iterator.hasNext) {
    +        // set group keys to aggregateBuffer
    +        for (i <- groupKeysMapping.indices) {
    +          aggregateBuffer.setField(i, record.getField(i))
    +        }
    +
    +        aggregateBuffer.setField(windowStartFieldPos, record.getField(windowStartFieldPos))
    +
    +        return aggregateBuffer
    +      }
    +    }
    +
    +    // this code path should never be reached as we return before the loop finishes
    +    throw new IllegalArgumentException("Group is empty. This should never happen.")
    --- End diff --
    
    Can be removed. Combine is only called if there is at least one record. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3364: [FLINK-5047] [table] Add sliding group-windows for...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3364#discussion_r104925837
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -186,6 +200,130 @@ object AggregateUtil {
       }
     
       /**
    +    * Create a [[org.apache.flink.api.common.functions.GroupReduceFunction]] that prepares for
    +    * partial aggregates of sliding windows (time and count-windows).
    +    * It requires a prepared input (with intermediate aggregate fields and aligned rowtime for
    +    * pre-tumbling in case of time-windows), pre-aggregates (pre-tumbles) rows, aligns the
    +    * window-start, and replicates or omits records for different panes of a sliding window.
    +    *
    +    * The output of the function contains the grouping keys, the intermediate aggregate values of
    +    * all aggregate function and the aligned window start. Window start must not be a timestamp,
    +    * but can also be a count value for count-windows.
    +    *
    +    * The output is stored in Row by the following format:
    +    *
    +    * {{{
    +    *                      avg(x) aggOffsetInRow = 2      count(z) aggOffsetInRow = 5
    +    *                            |                          |
    +    *                            v                          v
    +    *        +---------+---------+--------+--------+--------+--------+-------------+
    +    *        |groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 | windowStart |
    +    *        +---------+---------+--------+--------+--------+--------+-------------+
    +    *                                              ^                 ^
    +    *                                              |                 |
    +    *                                 sum(y) aggOffsetInRow = 4    window start for pane mapping
    +    * }}}
    +    *
    +    * NOTE: this function is only used for sliding windows with partial aggregates on batch tables.
    +    */
    +  def createDataSetSlideWindowPrepareGroupReduceFunction(
    +      window: LogicalWindow,
    +      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
    +      groupings: Array[Int],
    +      inputType: RelDataType,
    +      isParserCaseSensitive: Boolean)
    +    : RichGroupReduceFunction[Row, Row] = {
    +
    +    val aggregates = transformToAggregateFunctions(
    +      namedAggregates.map(_.getKey),
    +      inputType,
    +      needRetraction = false)._2
    +
    +    val returnType: RowTypeInfo = createDataSetAggregateBufferDataType(
    +      groupings,
    +      aggregates,
    +      inputType,
    +      Some(Array(BasicTypeInfo.LONG_TYPE_INFO)))
    +
    +    window match {
    +      case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) =>
    +        // sliding time-window
    +        // for partial aggregations
    +        new DataSetSlideTimeWindowAggReduceCombineFunction(
    +          aggregates,
    +          groupings.length,
    +          returnType.getArity - 1,
    +          asLong(size),
    +          asLong(slide),
    +          returnType)
    +
    +      case _ =>
    +        throw new UnsupportedOperationException(s"$window is currently not supported on batch.")
    +    }
    +  }
    +
    +  /**
    +    * Create a [[org.apache.flink.api.common.functions.FlatMapFunction]] that prepares for
    +    * non-incremental aggregates of sliding windows (time-windows).
    +    *
    +    * It requires a prepared input (with intermediate aggregate fields), aligns the
    +    * window-start, and replicates or omits records for different panes of a sliding window.
    +    *
    +    * The output of the function contains the grouping keys, the intermediate aggregate values of
    +    * all aggregate function and the aligned window start.
    +    *
    +    * The output is stored in Row by the following format:
    +    *
    +    * {{{
    +    *                      avg(x) aggOffsetInRow = 2      count(z) aggOffsetInRow = 5
    +    *                            |                          |
    +    *                            v                          v
    +    *        +---------+---------+--------+--------+--------+--------+-------------+
    +    *        |groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 | windowStart |
    +    *        +---------+---------+--------+--------+--------+--------+-------------+
    +    *                                              ^                 ^
    +    *                                              |                 |
    +    *                                 sum(y) aggOffsetInRow = 4      window start for pane mapping
    +    * }}}
    +    *
    +    * NOTE: this function is only used for time-based sliding windows on batch tables.
    +    */
    +  def createDataSetSlideWindowPrepareFlatMapFunction(
    +      window: LogicalWindow,
    +      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
    +      groupings: Array[Int],
    +      inputType: RelDataType,
    +      isParserCaseSensitive: Boolean)
    +    : FlatMapFunction[Row, Row] = {
    +
    +    val aggregates = transformToAggregateFunctions(
    --- End diff --
    
    not needed because input type = output type


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3364: [FLINK-5047] [table] Add sliding group-windows for...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3364#discussion_r104921975
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala ---
    @@ -0,0 +1,77 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import org.apache.flink.api.common.functions.RichFlatMapFunction
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ResultTypeQueryable
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{Collector, Preconditions}
    +
    +
    +/**
    +  * It is used for sliding windows on batch for time-windows. It takes a prepared input row,
    +  * aligns the window start, and replicates or omits records for different panes of a sliding
    +  * window. It is used for non-partial aggregations.
    +  *
    +  * @param aggregatesLength number of aggregate functions
    +  * @param groupingKeysLength number of grouping keys
    +  * @param windowSize window size of the sliding window
    +  * @param windowSlide window slide of the sliding window
    +  * @param returnType return type of this function
    +  */
    +class DataSetSlideTimeWindowAggFlatMapFunction(
    +    private val aggregatesLength: Int,
    +    private val groupingKeysLength: Int,
    +    private val timeFieldPos: Int,
    +    private val windowSize: Long,
    +    private val windowSlide: Long,
    +    @transient private val returnType: TypeInformation[Row])
    +  extends RichFlatMapFunction[Row, Row]
    +  with ResultTypeQueryable[Row] {
    +
    +  Preconditions.checkNotNull(aggregatesLength)
    +
    +  private var intermediateRow: Row = _
    --- End diff --
    
    can be removed because we are replicating the input record.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3364: [FLINK-5047] [table] Add sliding group-windows for...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3364#discussion_r104715450
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideCountWindowAggReduceGroupFunction.scala ---
    @@ -0,0 +1,113 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.lang.Iterable
    +import java.sql.Timestamp
    +
    +import org.apache.calcite.runtime.SqlFunctions
    +import org.apache.flink.api.common.functions.RichGroupReduceFunction
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ResultTypeQueryable
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{Collector, Preconditions}
    +
    +/**
    +  * It is used for sliding windows on batch for count-windows. It takes a prepared input row,
    +  * pre-aggregates (pre-tumbles) rows, aligns the window start, and replicates or omits records
    +  * for different panes of a sliding window.
    +  *
    +  * @param aggregates aggregate functions
    +  * @param groupingKeysLength number of grouping keys
    +  * @param preTumblingSize number of records to be aggregated (tumbled) before emission
    +  * @param windowSize window size of the sliding window
    +  * @param windowSlide window slide of the sliding window
    +  * @param returnType return type of this function
    +  */
    +class DataSetSlideCountWindowAggReduceGroupFunction(
    --- End diff --
    
    I will create an issue for this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3364: [FLINK-5047] [table] Add sliding group-windows for...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3364#discussion_r104521729
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideCountWindowAggReduceGroupFunction.scala ---
    @@ -0,0 +1,113 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.lang.Iterable
    +import java.sql.Timestamp
    +
    +import org.apache.calcite.runtime.SqlFunctions
    +import org.apache.flink.api.common.functions.RichGroupReduceFunction
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ResultTypeQueryable
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{Collector, Preconditions}
    +
    +/**
    +  * It is used for sliding windows on batch for count-windows. It takes a prepared input row,
    +  * pre-aggregates (pre-tumbles) rows, aligns the window start, and replicates or omits records
    +  * for different panes of a sliding window.
    +  *
    +  * @param aggregates aggregate functions
    +  * @param groupingKeysLength number of grouping keys
    +  * @param preTumblingSize number of records to be aggregated (tumbled) before emission
    +  * @param windowSize window size of the sliding window
    +  * @param windowSlide window slide of the sliding window
    +  * @param returnType return type of this function
    +  */
    +class DataSetSlideCountWindowAggReduceGroupFunction(
    +    private val aggregates: Array[Aggregate[_]],
    +    private val groupingKeysLength: Int,
    +    private val preTumblingSize: Long,
    +    private val windowSize: Long,
    +    private val windowSlide: Long,
    +    @transient private val returnType: TypeInformation[Row])
    +  extends RichGroupReduceFunction[Row, Row]
    +  with ResultTypeQueryable[Row] {
    +
    +  private var output: Row = _
    +  private var outWindowStartIndex: Int = _
    +
    +  override def open(config: Configuration) {
    +    Preconditions.checkNotNull(aggregates)
    +    // add one field to store window start count
    +    val partialRowLength = groupingKeysLength +
    +      aggregates.map(_.intermediateDataType.length).sum + 1
    +    output = new Row(partialRowLength)
    +    outWindowStartIndex = partialRowLength - 1
    +  }
    +
    +  override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
    +    var count: Long = 0
    +
    +    val iterator = records.iterator()
    +
    +    while (iterator.hasNext) {
    +      val record = iterator.next()
    +      // reset aggregates after completed tumbling
    +      if (count % preTumblingSize == 0) {
    +        // initiate intermediate aggregate value.
    +        aggregates.foreach(_.initiate(output))
    +      }
    +
    +      // merge intermediate aggregate value to buffer.
    +      aggregates.foreach(_.merge(record, output))
    +
    +      count += 1
    +
    +      // trigger tumbling evaluation
    +      if (count % preTumblingSize == 0) {
    --- End diff --
    
    do we also have to emit "incomplete" windows after the `while (iterator.hasNext)` loop?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3364: [FLINK-5047] [table] Add sliding group-windows for...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3364#discussion_r104713604
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala ---
    @@ -0,0 +1,93 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import org.apache.flink.api.common.functions.RichFlatMapFunction
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ResultTypeQueryable
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{Collector, Preconditions}
    +
    +
    +/**
    +  * It is used for sliding windows on batch for time-windows. It takes a prepared input row,
    +  * aligns the window start, and replicates or omits records for different panes of a sliding
    +  * window. It is used for non-incremental aggregations.
    +  *
    +  * @param aggregates aggregate functions
    +  * @param groupingKeysLength number of grouping keys
    +  * @param windowSize window size of the sliding window
    +  * @param windowSlide window slide of the sliding window
    +  * @param returnType return type of this function
    +  */
    +class DataSetSlideTimeWindowAggFlatMapFunction(
    +    private val aggregates: Array[Aggregate[_]],
    +    private val groupingKeysLength: Int,
    +    private val timeFieldPos: Int,
    +    private val windowSize: Long,
    +    private val windowSlide: Long,
    +    @transient private val returnType: TypeInformation[Row])
    +  extends RichFlatMapFunction[Row, Row]
    +  with ResultTypeQueryable[Row] {
    +
    +  private var aggregateBuffer: Row = _
    +  private var outWindowStartIndex: Int = _
    +
    +  override def open(config: Configuration) {
    +    Preconditions.checkNotNull(aggregates)
    --- End diff --
    
    Row is not always serializable and accumulators too. We are safer if we init them in `open()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3364: [FLINK-5047] [table] Add sliding group-windows for...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3364#discussion_r104714888
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala ---
    @@ -0,0 +1,93 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import org.apache.flink.api.common.functions.RichFlatMapFunction
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ResultTypeQueryable
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{Collector, Preconditions}
    +
    +
    +/**
    +  * It is used for sliding windows on batch for time-windows. It takes a prepared input row,
    +  * aligns the window start, and replicates or omits records for different panes of a sliding
    +  * window. It is used for non-incremental aggregations.
    +  *
    +  * @param aggregates aggregate functions
    +  * @param groupingKeysLength number of grouping keys
    +  * @param windowSize window size of the sliding window
    +  * @param windowSlide window slide of the sliding window
    +  * @param returnType return type of this function
    +  */
    +class DataSetSlideTimeWindowAggFlatMapFunction(
    +    private val aggregates: Array[Aggregate[_]],
    +    private val groupingKeysLength: Int,
    +    private val timeFieldPos: Int,
    +    private val windowSize: Long,
    +    private val windowSlide: Long,
    +    @transient private val returnType: TypeInformation[Row])
    +  extends RichFlatMapFunction[Row, Row]
    +  with ResultTypeQueryable[Row] {
    +
    +  private var aggregateBuffer: Row = _
    +  private var outWindowStartIndex: Int = _
    +
    +  override def open(config: Configuration) {
    +    Preconditions.checkNotNull(aggregates)
    +    // add one field to store window start
    +    val partialRowLength = groupingKeysLength +
    +      aggregates.map(_.intermediateDataType.length).sum + 1
    +    aggregateBuffer = new Row(partialRowLength)
    +    outWindowStartIndex = partialRowLength - 1
    +  }
    +
    +  override def flatMap(record: Row, out: Collector[Row]): Unit = {
    +    val windowStart = record.getField(timeFieldPos).asInstanceOf[Long]
    +
    +    // adopted from SlidingEventTimeWindows.assignWindows
    +    var start: Long = TimeWindow.getWindowStartWithOffset(windowStart, 0, windowSlide)
    +
    +    // skip preparing output if it is not necessary
    +    if (start > windowStart - windowSize) {
    +
    +      // prepare output
    +      for (i <- aggregates.indices) {
    --- End diff --
    
    You are right. Will simplify that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3364: [FLINK-5047] [table] Add sliding group-windows for...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3364#discussion_r104525555
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala ---
    @@ -146,42 +146,9 @@ class AggregationsITCase extends StreamingMultipleProgramsTestBase {
           "Hi,1,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
         assertEquals(expected.sorted, StreamITCase.testResults.sorted)
       }
    -
    -  @Test
    -  def testEventTimeSlidingWindow(): Unit = {
    --- End diff --
    
    Is this test subsumed by `DataStreamAggregateITCase`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3364: [FLINK-5047] [table] Add sliding group-windows for...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3364#discussion_r104717287
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideCountWindowAggReduceGroupFunction.scala ---
    @@ -0,0 +1,113 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.lang.Iterable
    +import java.sql.Timestamp
    +
    +import org.apache.calcite.runtime.SqlFunctions
    +import org.apache.flink.api.common.functions.RichGroupReduceFunction
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ResultTypeQueryable
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{Collector, Preconditions}
    +
    +/**
    +  * It is used for sliding windows on batch for count-windows. It takes a prepared input row,
    +  * pre-aggregates (pre-tumbles) rows, aligns the window start, and replicates or omits records
    +  * for different panes of a sliding window.
    +  *
    +  * @param aggregates aggregate functions
    +  * @param groupingKeysLength number of grouping keys
    +  * @param preTumblingSize number of records to be aggregated (tumbled) before emission
    +  * @param windowSize window size of the sliding window
    +  * @param windowSlide window slide of the sliding window
    +  * @param returnType return type of this function
    +  */
    +class DataSetSlideCountWindowAggReduceGroupFunction(
    +    private val aggregates: Array[Aggregate[_]],
    +    private val groupingKeysLength: Int,
    +    private val preTumblingSize: Long,
    +    private val windowSize: Long,
    +    private val windowSlide: Long,
    +    @transient private val returnType: TypeInformation[Row])
    +  extends RichGroupReduceFunction[Row, Row]
    +  with ResultTypeQueryable[Row] {
    +
    +  private var output: Row = _
    +  private var outWindowStartIndex: Int = _
    +
    +  override def open(config: Configuration) {
    +    Preconditions.checkNotNull(aggregates)
    +    // add one field to store window start count
    +    val partialRowLength = groupingKeysLength +
    +      aggregates.map(_.intermediateDataType.length).sum + 1
    +    output = new Row(partialRowLength)
    +    outWindowStartIndex = partialRowLength - 1
    +  }
    +
    +  override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
    +    var count: Long = 0
    +
    +    val iterator = records.iterator()
    +
    +    while (iterator.hasNext) {
    +      val record = iterator.next()
    +      // reset aggregates after completed tumbling
    +      if (count % preTumblingSize == 0) {
    +        // initiate intermediate aggregate value.
    +        aggregates.foreach(_.initiate(output))
    +      }
    +
    +      // merge intermediate aggregate value to buffer.
    +      aggregates.foreach(_.merge(record, output))
    +
    +      count += 1
    +
    +      // trigger tumbling evaluation
    +      if (count % preTumblingSize == 0) {
    --- End diff --
    
    I don't think so. It returns the same result than the DataStream variant.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3364: [FLINK-5047] [table] Add sliding group-windows for...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3364#discussion_r104922073
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala ---
    @@ -0,0 +1,77 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import org.apache.flink.api.common.functions.RichFlatMapFunction
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ResultTypeQueryable
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{Collector, Preconditions}
    +
    +
    +/**
    +  * It is used for sliding windows on batch for time-windows. It takes a prepared input row,
    +  * aligns the window start, and replicates or omits records for different panes of a sliding
    +  * window. It is used for non-partial aggregations.
    +  *
    +  * @param aggregatesLength number of aggregate functions
    +  * @param groupingKeysLength number of grouping keys
    +  * @param windowSize window size of the sliding window
    +  * @param windowSlide window slide of the sliding window
    +  * @param returnType return type of this function
    +  */
    +class DataSetSlideTimeWindowAggFlatMapFunction(
    +    private val aggregatesLength: Int,
    +    private val groupingKeysLength: Int,
    +    private val timeFieldPos: Int,
    +    private val windowSize: Long,
    +    private val windowSlide: Long,
    +    @transient private val returnType: TypeInformation[Row])
    +  extends RichFlatMapFunction[Row, Row]
    +  with ResultTypeQueryable[Row] {
    +
    +  Preconditions.checkNotNull(aggregatesLength)
    +
    +  private var intermediateRow: Row = _
    +  // add one field to store window start
    +  private val intermediateRowArity: Int = groupingKeysLength + aggregatesLength + 1
    --- End diff --
    
    can be removed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3364: [FLINK-5047] [table] Add sliding group-windows for...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3364#discussion_r104722693
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala ---
    @@ -360,6 +360,19 @@ class GroupWindowTest extends TableTestBase {
           .window(Session withGap 7.milli as 'w) // require on a time attribute
           .groupBy('string, 'w)
           .select('string, 'int.count)
    +
    +    val expected = unaryNode(
    --- End diff --
    
    Seems that I haven't seen the exception. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3364: [FLINK-5047] [table] Add sliding group-windows for...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3364#discussion_r104521151
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideCountWindowAggReduceGroupFunction.scala ---
    @@ -0,0 +1,113 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.lang.Iterable
    +import java.sql.Timestamp
    +
    +import org.apache.calcite.runtime.SqlFunctions
    +import org.apache.flink.api.common.functions.RichGroupReduceFunction
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ResultTypeQueryable
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{Collector, Preconditions}
    +
    +/**
    +  * It is used for sliding windows on batch for count-windows. It takes a prepared input row,
    +  * pre-aggregates (pre-tumbles) rows, aligns the window start, and replicates or omits records
    +  * for different panes of a sliding window.
    +  *
    +  * @param aggregates aggregate functions
    +  * @param groupingKeysLength number of grouping keys
    +  * @param preTumblingSize number of records to be aggregated (tumbled) before emission
    +  * @param windowSize window size of the sliding window
    +  * @param windowSlide window slide of the sliding window
    +  * @param returnType return type of this function
    +  */
    +class DataSetSlideCountWindowAggReduceGroupFunction(
    +    private val aggregates: Array[Aggregate[_]],
    +    private val groupingKeysLength: Int,
    +    private val preTumblingSize: Long,
    +    private val windowSize: Long,
    +    private val windowSlide: Long,
    +    @transient private val returnType: TypeInformation[Row])
    +  extends RichGroupReduceFunction[Row, Row]
    +  with ResultTypeQueryable[Row] {
    +
    +  private var output: Row = _
    +  private var outWindowStartIndex: Int = _
    +
    +  override def open(config: Configuration) {
    +    Preconditions.checkNotNull(aggregates)
    --- End diff --
    
    I think we can move everything into the constructor and remove the `open()` method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3364: [FLINK-5047] [table] Add sliding group-windows for...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3364#discussion_r104523761
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceCombineFunction.scala ---
    @@ -0,0 +1,84 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.lang.Iterable
    +
    +import org.apache.flink.api.common.functions.CombineFunction
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.types.Row
    +
    +/**
    +  * Wraps the aggregate logic inside of
    +  * [[org.apache.flink.api.java.operators.GroupReduceOperator]] and
    +  * [[org.apache.flink.api.java.operators.GroupCombineOperator]].
    +  *
    +  * It is used for sliding on batch for time-windows.
    +  *
    +  * @param aggregates aggregate functions
    +  * @param groupingKeysLength number of grouping keys
    +  * @param timeFieldPos position of aligned time field
    +  * @param windowSize window size of the sliding window
    +  * @param windowSlide window slide of the sliding window
    +  * @param returnType return type of this function
    +  */
    +class DataSetSlideTimeWindowAggReduceCombineFunction(
    +    aggregates: Array[Aggregate[_]],
    +    groupingKeysLength: Int,
    +    timeFieldPos: Int,
    +    windowSize: Long,
    +    windowSlide: Long,
    +    returnType: TypeInformation[Row])
    +  extends DataSetSlideTimeWindowAggReduceGroupFunction(
    +    aggregates,
    +    groupingKeysLength,
    +    timeFieldPos,
    +    windowSize,
    +    windowSlide,
    +    returnType)
    +  with CombineFunction[Row, Row] {
    +
    +  override def combine(records: Iterable[Row]): Row = {
    +    // initiate intermediate aggregate value
    +    aggregates.foreach(_.initiate(aggregateBuffer))
    +
    +    val iterator = records.iterator()
    +    while (iterator.hasNext) {
    +      val record = iterator.next()
    +
    +      // merge intermediate aggregate value to buffer
    +      aggregates.foreach(_.merge(record, aggregateBuffer))
    +
    +      // check if this record is the last record
    +      if (!iterator.hasNext) {
    +
    +        // set group keys value to buffer
    +        for (i <- 0 until groupingKeysLength) {
    +          aggregateBuffer.setField(i, record.getField(i))
    +        }
    +
    +        aggregateBuffer.setField(timeFieldPos, record.getField(timeFieldPos))
    +
    +        return aggregateBuffer
    +      }
    +    }
    +
    +    // this code path should never be reached as we return before the loop finishes
    +    throw new IllegalArgumentException("Group is empty. This should never happen.")
    --- End diff --
    
    can be removed. It's the responsibility of the DataSet API to call the user functions correctly. And even if it would not, this function would behave correctly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3364: [FLINK-5047] [table] Add sliding group-windows for...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3364#discussion_r104948259
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala ---
    @@ -53,7 +53,12 @@ class IncrementalAggregateAllWindowFunction[W <: Window](
     
         if (iterator.hasNext) {
           val record = iterator.next()
    -      out.collect(record)
    +      var i = 0
    +      while (i < record.getArity) {
    +        output.setField(i, record.getField(0))
    --- End diff --
    
    Good point!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3364: [FLINK-5047] [table] Add sliding group-windows for...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3364#discussion_r104925507
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -186,6 +200,130 @@ object AggregateUtil {
       }
     
       /**
    +    * Create a [[org.apache.flink.api.common.functions.GroupReduceFunction]] that prepares for
    +    * partial aggregates of sliding windows (time and count-windows).
    +    * It requires a prepared input (with intermediate aggregate fields and aligned rowtime for
    +    * pre-tumbling in case of time-windows), pre-aggregates (pre-tumbles) rows, aligns the
    +    * window-start, and replicates or omits records for different panes of a sliding window.
    +    *
    +    * The output of the function contains the grouping keys, the intermediate aggregate values of
    +    * all aggregate function and the aligned window start. Window start must not be a timestamp,
    +    * but can also be a count value for count-windows.
    +    *
    +    * The output is stored in Row by the following format:
    +    *
    +    * {{{
    +    *                      avg(x) aggOffsetInRow = 2      count(z) aggOffsetInRow = 5
    +    *                            |                          |
    +    *                            v                          v
    +    *        +---------+---------+--------+--------+--------+--------+-------------+
    +    *        |groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 | windowStart |
    +    *        +---------+---------+--------+--------+--------+--------+-------------+
    +    *                                              ^                 ^
    +    *                                              |                 |
    +    *                                 sum(y) aggOffsetInRow = 4    window start for pane mapping
    +    * }}}
    +    *
    +    * NOTE: this function is only used for sliding windows with partial aggregates on batch tables.
    +    */
    +  def createDataSetSlideWindowPrepareGroupReduceFunction(
    +      window: LogicalWindow,
    +      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
    +      groupings: Array[Int],
    +      inputType: RelDataType,
    +      isParserCaseSensitive: Boolean)
    +    : RichGroupReduceFunction[Row, Row] = {
    +
    +    val aggregates = transformToAggregateFunctions(
    +      namedAggregates.map(_.getKey),
    +      inputType,
    +      needRetraction = false)._2
    +
    +    val returnType: RowTypeInfo = createDataSetAggregateBufferDataType(
    +      groupings,
    +      aggregates,
    +      inputType,
    +      Some(Array(BasicTypeInfo.LONG_TYPE_INFO)))
    +
    +    window match {
    +      case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) =>
    +        // sliding time-window
    +        // for partial aggregations
    +        new DataSetSlideTimeWindowAggReduceCombineFunction(
    +          aggregates,
    +          groupings.length,
    +          returnType.getArity - 1,
    +          asLong(size),
    +          asLong(slide),
    +          returnType)
    +
    +      case _ =>
    +        throw new UnsupportedOperationException(s"$window is currently not supported on batch.")
    +    }
    +  }
    +
    +  /**
    +    * Create a [[org.apache.flink.api.common.functions.FlatMapFunction]] that prepares for
    +    * non-incremental aggregates of sliding windows (time-windows).
    +    *
    +    * It requires a prepared input (with intermediate aggregate fields), aligns the
    +    * window-start, and replicates or omits records for different panes of a sliding window.
    +    *
    +    * The output of the function contains the grouping keys, the intermediate aggregate values of
    +    * all aggregate function and the aligned window start.
    +    *
    +    * The output is stored in Row by the following format:
    +    *
    +    * {{{
    +    *                      avg(x) aggOffsetInRow = 2      count(z) aggOffsetInRow = 5
    +    *                            |                          |
    +    *                            v                          v
    +    *        +---------+---------+--------+--------+--------+--------+-------------+
    +    *        |groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 | windowStart |
    +    *        +---------+---------+--------+--------+--------+--------+-------------+
    +    *                                              ^                 ^
    +    *                                              |                 |
    +    *                                 sum(y) aggOffsetInRow = 4      window start for pane mapping
    +    * }}}
    +    *
    +    * NOTE: this function is only used for time-based sliding windows on batch tables.
    +    */
    +  def createDataSetSlideWindowPrepareFlatMapFunction(
    +      window: LogicalWindow,
    +      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
    +      groupings: Array[Int],
    +      inputType: RelDataType,
    +      isParserCaseSensitive: Boolean)
    +    : FlatMapFunction[Row, Row] = {
    +
    +    val aggregates = transformToAggregateFunctions(
    +      namedAggregates.map(_.getKey),
    +      inputType,
    +      needRetraction = false)._2
    +
    +    val mapReturnType: RowTypeInfo = createDataSetAggregateBufferDataType(
    --- End diff --
    
    output type should be equal to input type.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3364: [FLINK-5047] [table] Add sliding group-windows for...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3364#discussion_r104725017
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideCountWindowAggReduceGroupFunction.scala ---
    @@ -0,0 +1,113 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.lang.Iterable
    +import java.sql.Timestamp
    +
    +import org.apache.calcite.runtime.SqlFunctions
    +import org.apache.flink.api.common.functions.RichGroupReduceFunction
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ResultTypeQueryable
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{Collector, Preconditions}
    +
    +/**
    +  * It is used for sliding windows on batch for count-windows. It takes a prepared input row,
    +  * pre-aggregates (pre-tumbles) rows, aligns the window start, and replicates or omits records
    +  * for different panes of a sliding window.
    +  *
    +  * @param aggregates aggregate functions
    +  * @param groupingKeysLength number of grouping keys
    +  * @param preTumblingSize number of records to be aggregated (tumbled) before emission
    +  * @param windowSize window size of the sliding window
    +  * @param windowSlide window slide of the sliding window
    +  * @param returnType return type of this function
    +  */
    +class DataSetSlideCountWindowAggReduceGroupFunction(
    +    private val aggregates: Array[Aggregate[_]],
    +    private val groupingKeysLength: Int,
    +    private val preTumblingSize: Long,
    +    private val windowSize: Long,
    +    private val windowSlide: Long,
    +    @transient private val returnType: TypeInformation[Row])
    +  extends RichGroupReduceFunction[Row, Row]
    +  with ResultTypeQueryable[Row] {
    +
    +  private var output: Row = _
    +  private var outWindowStartIndex: Int = _
    +
    +  override def open(config: Configuration) {
    +    Preconditions.checkNotNull(aggregates)
    +    // add one field to store window start count
    +    val partialRowLength = groupingKeysLength +
    +      aggregates.map(_.intermediateDataType.length).sum + 1
    +    output = new Row(partialRowLength)
    +    outWindowStartIndex = partialRowLength - 1
    +  }
    +
    +  override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
    +    var count: Long = 0
    +
    +    val iterator = records.iterator()
    +
    +    while (iterator.hasNext) {
    +      val record = iterator.next()
    +      // reset aggregates after completed tumbling
    +      if (count % preTumblingSize == 0) {
    +        // initiate intermediate aggregate value.
    +        aggregates.foreach(_.initiate(output))
    +      }
    +
    +      // merge intermediate aggregate value to buffer.
    +      aggregates.foreach(_.merge(record, output))
    +
    +      count += 1
    +
    +      // trigger tumbling evaluation
    +      if (count % preTumblingSize == 0) {
    --- End diff --
    
    But shouldn't batch be the reference for streaming?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3364: [FLINK-5047] [table] Add sliding group-windows for...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3364#discussion_r104706852
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -153,6 +170,156 @@ object AggregateUtil {
       }
     
       /**
    +    * Create a [[org.apache.flink.api.common.functions.GroupReduceFunction]] that prepares for
    +    * incremental aggregates of sliding windows (time and count-windows).
    +    * It requires a prepared input (with intermediate aggregate fields and aligned rowtime for
    +    * pre-tumbling in case of time-windows), pre-aggregates (pre-tumbles) rows, aligns the
    +    * window-start, and replicates or omits records for different panes of a sliding window.
    +    *
    +    * The output of the function contains the grouping keys, the intermediate aggregate values of
    +    * all aggregate function and the aligned window start. Window start must not be a timestamp,
    +    * but can also be a count value for count-windows.
    +    *
    +    * The output is stored in Row by the following format:
    +    *
    +    * {{{
    +    *                      avg(x) aggOffsetInRow = 2      count(z) aggOffsetInRow = 5
    +    *                            |                          |
    +    *                            v                          v
    +    *        +---------+---------+--------+--------+--------+--------+-------------+
    +    *        |groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 | windowStart |
    +    *        +---------+---------+--------+--------+--------+--------+-------------+
    +    *                                              ^                 ^
    +    *                                              |                 |
    +    *                                 sum(y) aggOffsetInRow = 4    window start for pane mapping
    +    * }}}
    +    *
    +    * NOTE: this function is only used for sliding windows on batch tables.
    +    */
    +  def createDataSetSlideWindowPrepareGroupReduceFunction(
    +      window: LogicalWindow,
    +      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
    +      groupings: Array[Int],
    +      inputType: RelDataType,
    +      isParserCaseSensitive: Boolean)
    +    : RichGroupReduceFunction[Row, Row] = {
    +
    +    val aggregates = transformToAggregateFunctions(
    +      namedAggregates.map(_.getKey),
    +      inputType,
    +      groupings.length)._2
    +
    +    val returnType: RowTypeInfo = createAggregateBufferDataType(
    +      groupings,
    +      aggregates,
    +      inputType,
    +      Some(Array(BasicTypeInfo.LONG_TYPE_INFO)))
    +
    +    window match {
    +      case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) =>
    +        // sliding time-window
    +        if (aggregates.forall(_.supportPartial)) {
    +          // for incremental aggregations
    +          new DataSetSlideTimeWindowAggReduceCombineFunction(
    +            aggregates,
    +            groupings.length,
    +            returnType.getArity - 1,
    +            asLong(size),
    +            asLong(slide),
    +            returnType)
    +        } else {
    +          // for non-incremental aggregations
    --- End diff --
    
    You are right. I will remove this case distinction.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3364: [FLINK-5047] [table] Add sliding group-windows for...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3364#discussion_r104721437
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala ---
    @@ -0,0 +1,88 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.lang.Iterable
    +
    +import org.apache.flink.api.common.functions.CombineFunction
    +import org.apache.flink.types.Row
    +
    +/**
    +  * Wraps the aggregate logic inside of
    +  * [[org.apache.flink.api.java.operators.GroupReduceOperator]] and
    +  * [[org.apache.flink.api.java.operators.GroupCombineOperator]].
    +  *
    +  * It is used for sliding on batch for both time and count-windows.
    +  *
    +  * @param aggregates aggregate functions.
    +  * @param groupKeysMapping index mapping of group keys between intermediate aggregate Row
    +  *                         and output Row.
    +  * @param aggregateMapping index mapping between aggregate function list and aggregated value
    +  *                         index in output Row.
    +  * @param intermediateRowArity intermediate row field count
    +  * @param finalRowArity output row field count
    +  * @param finalRowWindowStartPos relative window-start position to last field of output row
    +  * @param finalRowWindowEndPos relative window-end position to last field of output row
    +  * @param windowSize size of the window, used to determine window-end for output row
    +  */
    +class DataSetSlideWindowAggReduceCombineFunction(
    +    aggregates: Array[Aggregate[_ <: Any]],
    +    groupKeysMapping: Array[(Int, Int)],
    +    aggregateMapping: Array[(Int, Int)],
    +    intermediateRowArity: Int,
    +    finalRowArity: Int,
    +    finalRowWindowStartPos: Option[Int],
    +    finalRowWindowEndPos: Option[Int],
    +    windowSize: Long)
    +  extends DataSetSlideWindowAggReduceGroupFunction(
    +    aggregates,
    +    groupKeysMapping,
    +    aggregateMapping,
    +    intermediateRowArity,
    +    finalRowArity,
    +    finalRowWindowStartPos,
    +    finalRowWindowEndPos,
    +    windowSize)
    +  with CombineFunction[Row, Row] {
    +
    +  override def combine(records: Iterable[Row]): Row = {
    +    // initiate intermediate aggregate value
    +    aggregates.foreach(_.initiate(aggregateBuffer))
    +
    +    val iterator = records.iterator()
    +    while (iterator.hasNext) {
    +      val record = iterator.next()
    +      aggregates.foreach(_.merge(record, aggregateBuffer))
    +
    +      // check if this record is the last record
    +      if (!iterator.hasNext) {
    +        // set group keys to aggregateBuffer
    +        for (i <- groupKeysMapping.indices) {
    +          aggregateBuffer.setField(i, record.getField(i))
    +        }
    +
    +        aggregateBuffer.setField(windowStartFieldPos, record.getField(windowStartFieldPos))
    +
    +        return aggregateBuffer
    +      }
    +    }
    +
    +    // this code path should never be reached as we return before the loop finishes
    +    throw new IllegalArgumentException("Group is empty. This should never happen.")
    --- End diff --
    
    I cannot remove it. The compiler complains otherwise.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3364: [FLINK-5047] [table] Add sliding group-windows for...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3364#discussion_r104527336
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala ---
    @@ -169,4 +168,228 @@ class DataSetWindowAggregateITCase(configMode: TableConfigMode)
           .toDataSet[Row]
       }
     
    +  @Test(expected = classOf[UnsupportedOperationException])
    +  def testAllEventTimeSlidingGroupWindowOverCount(): Unit = {
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env, config)
    +
    +    val table = env.fromCollection(data).toTable(tEnv, 'long, 'int, 'string)
    +
    +    // Count sliding non-grouping window on event-time are currently not supported
    +    table
    +      .window(Slide over 2.rows every 2.rows on 'long as 'w)
    +      .groupBy('w)
    +      .select('int.count)
    +      .toDataSet[Row]
    +  }
    +
    +  @Test
    +  def testAllEventTimeSlidingGroupWindowOverTime(): Unit = {
    +    // please keep this test in sync with the DataStream variant
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +
    +    val table = env.fromCollection(data).toTable(tEnv, 'long, 'int, 'string)
    +
    +    val windowedTable = table
    +      .window(Slide over 5.milli every 2.milli on 'long as 'w)
    +      .groupBy('w)
    +      .select('int.count, 'w.start, 'w.end)
    +
    +    val expected =
    +      "1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.013\n" +
    +      "1,1970-01-01 00:00:00.012,1970-01-01 00:00:00.017\n" +
    +      "1,1970-01-01 00:00:00.014,1970-01-01 00:00:00.019\n" +
    +      "1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.021\n" +
    +      "2,1969-12-31 23:59:59.998,1970-01-01 00:00:00.003\n" +
    +      "2,1970-01-01 00:00:00.006,1970-01-01 00:00:00.011\n" +
    +      "3,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009\n" +
    +      "4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005\n" +
    +      "4,1970-01-01 00:00:00.002,1970-01-01 00:00:00.007"
    +
    +    val results = windowedTable.toDataSet[Row].collect()
    +    TestBaseUtils.compareResultAsText(results.asJava, expected)
    +  }
    +
    +  @Test
    +  def testEventTimeSlidingGroupWindowOverTimeOverlappingFullPane(): Unit = {
    +    // please keep this test in sync with the DataStream variant
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +
    +    val table = env.fromCollection(data).toTable(tEnv, 'long, 'int, 'string)
    +
    +    val windowedTable = table
    +      .window(Slide over 10.milli every 5.milli on 'long as 'w)
    +      .groupBy('string, 'w)
    +      .select('string, 'int.count, 'w.start, 'w.end)
    +
    +    val expected =
    +      "Hallo,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005\n" +
    +      "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01\n" +
    +      "Hello world,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01\n" +
    +      "Hello world,1,1970-01-01 00:00:00.005,1970-01-01 00:00:00.015\n" +
    +      "Hello world,1,1970-01-01 00:00:00.01,1970-01-01 00:00:00.02\n" +
    +      "Hello world,1,1970-01-01 00:00:00.015,1970-01-01 00:00:00.025\n" +
    +      "Hello,1,1970-01-01 00:00:00.005,1970-01-01 00:00:00.015\n" +
    +      "Hello,2,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005\n" +
    +      "Hello,3,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01\n" +
    +      "Hi,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005\n" +
    +      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01"
    +
    +    val results = windowedTable.toDataSet[Row].collect()
    +    TestBaseUtils.compareResultAsText(results.asJava, expected)
    +  }
    +
    +  @Test
    +  def testEventTimeSlidingGroupWindowOverTimeOverlappingSplitPane(): Unit = {
    +    // please keep this test in sync with the DataStream variant
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +
    +    val table = env.fromCollection(data).toTable(tEnv, 'long, 'int, 'string)
    +
    +    val windowedTable = table
    +      .window(Slide over 5.milli every 4.milli on 'long as 'w)
    +      .groupBy('string, 'w)
    +      .select('string, 'int.count, 'w.start, 'w.end)
    +
    +    val expected =
    +      "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005\n" +
    +      "Hello world,1,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009\n" +
    +      "Hello world,1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.013\n" +
    +      "Hello world,1,1970-01-01 00:00:00.012,1970-01-01 00:00:00.017\n" +
    +      "Hello world,1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.021\n" +
    +      "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005\n" +
    +      "Hello,2,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009\n" +
    +      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005"
    +
    +    val results = windowedTable.toDataSet[Row].collect()
    +    TestBaseUtils.compareResultAsText(results.asJava, expected)
    +  }
    +
    +  @Test
    +  def testEventTimeSlidingGroupWindowOverTimeNonOverlappingFullPane(): Unit = {
    +    // please keep this test in sync with the DataStream variant
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +
    +    val table = env.fromCollection(data).toTable(tEnv, 'long, 'int, 'string)
    +
    +    val windowedTable = table
    +      .window(Slide over 5.milli every 10.milli on 'long as 'w)
    +      .groupBy('string, 'w)
    +      .select('string, 'int.count, 'w.start, 'w.end)
    +
    +    val expected =
    +      "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005\n" +
    --- End diff --
    
    There are two result rows for the same window (Hallo, 00:00:00.0, 00:00:00.005). Is this correct?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3364: [FLINK-5047] [table] Add sliding group-windows for...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3364#discussion_r104927085
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala ---
    @@ -112,12 +112,10 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction(
         }
     
         // get final aggregate value and set to output.
    -    aggregateMapping.foreach {
    -      case (after, previous) => {
    +    aggregateMapping.foreach { case (after, previous) =>
    --- End diff --
    
    revert this change? Will be fixed with #3489 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3364: [FLINK-5047] [table] Add sliding group-windows for...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3364#discussion_r104922022
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala ---
    @@ -0,0 +1,77 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import org.apache.flink.api.common.functions.RichFlatMapFunction
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ResultTypeQueryable
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{Collector, Preconditions}
    +
    +
    +/**
    +  * It is used for sliding windows on batch for time-windows. It takes a prepared input row,
    +  * aligns the window start, and replicates or omits records for different panes of a sliding
    +  * window. It is used for non-partial aggregations.
    +  *
    +  * @param aggregatesLength number of aggregate functions
    +  * @param groupingKeysLength number of grouping keys
    +  * @param windowSize window size of the sliding window
    +  * @param windowSlide window slide of the sliding window
    +  * @param returnType return type of this function
    +  */
    +class DataSetSlideTimeWindowAggFlatMapFunction(
    +    private val aggregatesLength: Int,
    +    private val groupingKeysLength: Int,
    +    private val timeFieldPos: Int,
    +    private val windowSize: Long,
    +    private val windowSlide: Long,
    +    @transient private val returnType: TypeInformation[Row])
    +  extends RichFlatMapFunction[Row, Row]
    +  with ResultTypeQueryable[Row] {
    +
    +  Preconditions.checkNotNull(aggregatesLength)
    +
    +  private var intermediateRow: Row = _
    +  // add one field to store window start
    +  private val intermediateRowArity: Int = groupingKeysLength + aggregatesLength + 1
    +
    +  override def open(config: Configuration) {
    --- End diff --
    
    `open()` can be removed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3364: [FLINK-5047] [table] Add sliding group-windows for...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3364#discussion_r104927549
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala ---
    @@ -53,7 +53,12 @@ class IncrementalAggregateAllWindowFunction[W <: Window](
     
         if (iterator.hasNext) {
           val record = iterator.next()
    -      out.collect(record)
    +      var i = 0
    +      while (i < record.getArity) {
    +        output.setField(i, record.getField(0))
    --- End diff --
    
    `record.getField(0)` -> `record.getField(i)`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3364: [FLINK-5047] [table] Add sliding group-windows for...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3364#discussion_r104925166
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala ---
    @@ -0,0 +1,77 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import org.apache.flink.api.common.functions.RichFlatMapFunction
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ResultTypeQueryable
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{Collector, Preconditions}
    +
    +
    +/**
    +  * It is used for sliding windows on batch for time-windows. It takes a prepared input row,
    +  * aligns the window start, and replicates or omits records for different panes of a sliding
    +  * window. It is used for non-partial aggregations.
    +  *
    +  * @param aggregatesLength number of aggregate functions
    +  * @param groupingKeysLength number of grouping keys
    +  * @param windowSize window size of the sliding window
    +  * @param windowSlide window slide of the sliding window
    +  * @param returnType return type of this function
    +  */
    +class DataSetSlideTimeWindowAggFlatMapFunction(
    +    private val aggregatesLength: Int,
    --- End diff --
    
    `aggregatesLength` and `groupingKeysLength` can be removed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3364: [FLINK-5047] [table] Add sliding group-windows for...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3364#discussion_r104524635
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala ---
    @@ -0,0 +1,107 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.lang.Iterable
    +
    +import org.apache.flink.api.common.functions.RichGroupReduceFunction
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{Collector, Preconditions}
    +
    +/**
    +  * It wraps the aggregate logic inside of
    +  * [[org.apache.flink.api.java.operators.GroupReduceOperator]].
    +  *
    +  * It is used for sliding on batch for both time and count-windows.
    +  *
    +  * @param aggregates aggregate functions.
    +  * @param groupKeysMapping index mapping of group keys between intermediate aggregate Row
    +  *                         and output Row.
    +  * @param aggregateMapping index mapping between aggregate function list and aggregated value
    +  *                         index in output Row.
    +  * @param intermediateRowArity intermediate row field count
    +  * @param finalRowArity output row field count
    +  * @param finalRowWindowStartPos relative window-start position to last field of output row
    +  * @param finalRowWindowEndPos relative window-end position to last field of output row
    +  * @param windowSize size of the window, used to determine window-end for output row
    +  */
    +class DataSetSlideWindowAggReduceGroupFunction(
    +    aggregates: Array[Aggregate[_ <: Any]],
    +    groupKeysMapping: Array[(Int, Int)],
    +    aggregateMapping: Array[(Int, Int)],
    +    intermediateRowArity: Int,
    +    finalRowArity: Int,
    +    finalRowWindowStartPos: Option[Int],
    +    finalRowWindowEndPos: Option[Int],
    +    windowSize: Long)
    +  extends RichGroupReduceFunction[Row, Row] {
    +
    +  protected var aggregateBuffer: Row = _
    +  protected var windowStartFieldPos: Int = _
    +
    +  private var collector: TimeWindowPropertyCollector = _
    +  private var output: Row = _
    +
    +  override def open(config: Configuration) {
    +    Preconditions.checkNotNull(aggregates)
    --- End diff --
    
    Except for the initialization of `TimeWindowPropertyCollector` everything can be moved into the constructor. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---