You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by huawei-flink <gi...@git.apache.org> on 2017/04/18 12:32:00 UTC

[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

GitHub user huawei-flink opened a pull request:

    https://github.com/apache/flink/pull/3732

    [FLINK-6250] Distinct procTime with Rows boundaries

    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [X] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [X] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [X] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/huawei-flink/flink FLINK-6250

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3732.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3732
    
----
commit 4e3da4c9baebf48bfc47ef192287e7e17ab69efd
Author: Stefano Bortoli <s....@gmail.com>
Date:   2017-04-18T12:27:26Z

    DIST() for aggregation on procTime row bounded windows

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3732#discussion_r112694911
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -152,7 +153,14 @@ object AggregateUtil {
     
         val aggregationStateType: RowTypeInfo = createAccumulatorRowType(aggregates)
         val inputRowType = FlinkTypeFactory.toInternalRowTypeInfo(inputType).asInstanceOf[RowTypeInfo]
    -
    +    
    +    var hasDistinct = false
    --- End diff --
    
    can be simplified to 
    ```
    val hasDistinct = distinctAggregatesFlags.exists(_)
    ```
    
    `exists` returns `true` if the function (here `_`) returns `true` at least once.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3732#discussion_r112693953
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---
    @@ -91,6 +93,22 @@ class DataStreamOverAggregate(
     
         val overWindow: org.apache.calcite.rel.core.Window.Group = logicWindow.groups.get(0)
     
    +    val distinctVarMap: Map[String,Boolean] = new HashMap[String, Boolean]
    +    if (input.isInstanceOf[DataStreamCalc]) {
    +      val dsCalc = input.asInstanceOf[DataStreamCalc]
    +      val iter = dsCalc
    +                 .selectionToString(dsCalc.getProgram, dsCalc.getExpressionString)
    +                 .split(",")
    +                 .iterator
    +      while (iter.hasNext) {
    +        val exp = iter.next
    +        if(exp.contains("DIST")){
    +          val varName = exp.substring(exp.indexOf("$"))
    +           distinctVarMap.put(varName,true)
    --- End diff --
    
    +space


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by stefanobortoli <gi...@git.apache.org>.
Github user stefanobortoli commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3732#discussion_r112725482
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UnsupportedOperatorsIndicatorFunctions.scala ---
    @@ -0,0 +1,46 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.functions
    +
    +import java.nio.charset.Charset
    +import java.util.List
    +
    +import org.apache.calcite.rel.`type`._
    +import org.apache.calcite.sql._
    +import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes, SqlTypeFamily, SqlTypeName}
    +import org.apache.calcite.sql.validate.SqlMonotonicity
    +import org.apache.calcite.tools.RelBuilder
    +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
    +import org.apache.flink.table.api.TableException
    +import org.apache.flink.table.expressions.LeafExpression
    +import org.apache.calcite.sql.`type`.InferTypes
    +import org.apache.calcite.sql.validate.SqlValidator
    +import org.apache.calcite.sql.validate.SqlValidatorScope
    +
    +/**
    + * An SQL Function DISTINCT(<NUMERIC>) used to mark the DISTINCT operator
    + * on aggregation input. This is temporary workaround waiting for 
    + * https://issues.apache.org/jira/browse/CALCITE-1740 being solved
    + */
    +object DistinctAggregatorExtractor extends SqlFunction("DIST", SqlKind.OTHER_FUNCTION,
    +  ReturnTypes.ARG0, InferTypes.RETURN_TYPE,
    +  OperandTypes.NUMERIC, SqlFunctionCategory.NUMERIC) {
    --- End diff --
    
    one never ends to learn. :-)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3732#discussion_r112700998
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala ---
    @@ -0,0 +1,230 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.util
    +
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{Collector, Preconditions}
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +import org.apache.flink.api.common.state.MapState
    +import org.apache.flink.api.common.state.MapStateDescriptor
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ListTypeInfo
    +import java.util.{List => JList}
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    +import org.apache.flink.api.common.state.ListState
    +
    +class ProcTimeBoundedDistinctRowsOver(
    +  private val aggregates: Array[AggregateFunction[_]],
    +  private val aggFields: Array[Array[Int]],
    +  private val distinctAggsFlag: Array[Boolean],
    +  private val precedingOffset: Long,
    +  private val forwardedFieldCount: Int,
    +  private val aggregatesTypeInfo: RowTypeInfo,
    +  private val inputType: TypeInformation[Row])
    +    extends ProcessFunction[Row, Row] {
    +
    +  Preconditions.checkNotNull(aggregates)
    +  Preconditions.checkNotNull(aggFields)
    +  Preconditions.checkNotNull(distinctAggsFlag)
    +  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
    +  Preconditions.checkArgument(aggregates.length == aggFields.length)
    +  Preconditions.checkArgument(precedingOffset > 0)
    +
    +  private var accumulatorState: ValueState[Row] = _
    +  private var rowMapState: MapState[Long, JList[Row]] = _
    +  private var output: Row = _
    +  private var counterState: ValueState[Long] = _
    +  private var smallestTsState: ValueState[Long] = _
    +  private var distinctValueStateList: Array[MapState[Any, Long]] = _
    +
    +  override def open(config: Configuration) {
    +
    +    output = new Row(forwardedFieldCount + aggregates.length)
    +    // We keep the elements received in a Map state keyed
    +    // by the ingestion time in the operator.
    +    // we also keep counter of processed elements
    +    // and timestamp of oldest element
    +    val rowListTypeInfo: TypeInformation[JList[Row]] =
    +      new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
    +
    +    val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
    +      new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
    +        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo)
    +    rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
    +
    +    val aggregationStateDescriptor: ValueStateDescriptor[Row] =
    +      new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
    +    accumulatorState = getRuntimeContext.getState(aggregationStateDescriptor)
    +
    +    val processedCountDescriptor : ValueStateDescriptor[Long] =
    +       new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
    +    counterState = getRuntimeContext.getState(processedCountDescriptor)
    +
    +    val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
    +       new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
    +    smallestTsState = getRuntimeContext.getState(smallestTimestampDescriptor)
    +    distinctValueStateList = new Array(aggregates.size)
    +    for(i <- 0 until aggregates.size){
    +      if(distinctAggsFlag(i)){
    +        val distinctValDescriptor =  new MapStateDescriptor[Any, Long](
    --- End diff --
    
    We should use the concrete type of the function argument here. Otherwise, the values won't be efficiently serialized.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by hongyuhong <gi...@git.apache.org>.
Github user hongyuhong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3732#discussion_r112105763
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala ---
    @@ -1140,6 +1140,202 @@ class SqlITCase extends StreamingWithStateTestBase {
         assertEquals(expected.sorted, StreamITCase.testResults.sorted)
       }
     
    +  @Test
    +  def testNonPartitionedProcTimeOverDistinctWindow(): Unit = {
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    env.setStateBackend(getStateBackend)
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    env.setParallelism(1)
    +    StreamITCase.testResults = mutable.MutableList()
    +
    +    val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
    +    tEnv.registerTable("MyTable", t)
    +
    +    val sqlQuery = "SELECT a,  " +
    +      "  SUM(DIST(e)) OVER (" +
    +      " ORDER BY procTime() ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS sumE " +
    --- End diff --
    
    I think we can add some test for multi aggregation with distinct.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3732#discussion_r112702163
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala ---
    @@ -0,0 +1,230 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.util
    +
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{Collector, Preconditions}
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +import org.apache.flink.api.common.state.MapState
    +import org.apache.flink.api.common.state.MapStateDescriptor
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ListTypeInfo
    +import java.util.{List => JList}
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    +import org.apache.flink.api.common.state.ListState
    +
    +class ProcTimeBoundedDistinctRowsOver(
    +  private val aggregates: Array[AggregateFunction[_]],
    +  private val aggFields: Array[Array[Int]],
    +  private val distinctAggsFlag: Array[Boolean],
    +  private val precedingOffset: Long,
    +  private val forwardedFieldCount: Int,
    +  private val aggregatesTypeInfo: RowTypeInfo,
    +  private val inputType: TypeInformation[Row])
    +    extends ProcessFunction[Row, Row] {
    +
    +  Preconditions.checkNotNull(aggregates)
    +  Preconditions.checkNotNull(aggFields)
    +  Preconditions.checkNotNull(distinctAggsFlag)
    +  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
    +  Preconditions.checkArgument(aggregates.length == aggFields.length)
    +  Preconditions.checkArgument(precedingOffset > 0)
    +
    +  private var accumulatorState: ValueState[Row] = _
    +  private var rowMapState: MapState[Long, JList[Row]] = _
    +  private var output: Row = _
    +  private var counterState: ValueState[Long] = _
    +  private var smallestTsState: ValueState[Long] = _
    +  private var distinctValueStateList: Array[MapState[Any, Long]] = _
    +
    +  override def open(config: Configuration) {
    +
    +    output = new Row(forwardedFieldCount + aggregates.length)
    +    // We keep the elements received in a Map state keyed
    +    // by the ingestion time in the operator.
    +    // we also keep counter of processed elements
    +    // and timestamp of oldest element
    +    val rowListTypeInfo: TypeInformation[JList[Row]] =
    +      new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
    +
    +    val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
    +      new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
    +        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo)
    +    rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
    +
    +    val aggregationStateDescriptor: ValueStateDescriptor[Row] =
    +      new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
    +    accumulatorState = getRuntimeContext.getState(aggregationStateDescriptor)
    +
    +    val processedCountDescriptor : ValueStateDescriptor[Long] =
    +       new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
    +    counterState = getRuntimeContext.getState(processedCountDescriptor)
    +
    +    val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
    +       new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
    +    smallestTsState = getRuntimeContext.getState(smallestTimestampDescriptor)
    +    distinctValueStateList = new Array(aggregates.size)
    +    for(i <- 0 until aggregates.size){
    +      if(distinctAggsFlag(i)){
    +        val distinctValDescriptor =  new MapStateDescriptor[Any, Long](
    +                                             "distinctValuesBufferMapState" + i,
    +                                              classOf[Any],
    +                                              classOf[Long])
    +        distinctValueStateList(i)=getRuntimeContext.getMapState(distinctValDescriptor)
    +      }
    +    }
    +  }
    +
    +  override def processElement(
    +    input: Row,
    +    ctx: ProcessFunction[Row, Row]#Context,
    +    out: Collector[Row]): Unit = {
    +
    +    val currentTime = ctx.timerService.currentProcessingTime
    +    var i = 0
    +
    +    // initialize state for the processed element
    +    var accumulators = accumulatorState.value
    +    if (accumulators == null) {
    +      accumulators = new Row(aggregates.length)
    +      while (i < aggregates.length) {
    +        accumulators.setField(i, aggregates(i).createAccumulator())
    +        i += 1
    +      }
    +    }
    +
    +    // get smallest timestamp
    +    var smallestTs = smallestTsState.value
    +    if (smallestTs == 0L) {
    +      smallestTs = currentTime
    +      smallestTsState.update(smallestTs)
    +    }
    +    // get previous counter value
    +    var counter = counterState.value
    +
    +    if (counter == precedingOffset) {
    +      val retractList = rowMapState.get(smallestTs)
    +
    +      // get oldest element beyond buffer size
    +      // and if oldest element exist, retract value
    +      var removeCounter :Integer = 0
    +      var distinctCounter : Integer = 0
    +      var retractVal : Object = null
    +      i = 0
    +      while (i < aggregates.length) {
    +        val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
    +        retractVal = retractList.get(0).getField(aggFields(i)(0))
    +        if(distinctAggsFlag(i)){
    +          var distinctValCounter: Long = distinctValueStateList(i).get(retractVal)
    +          // if the value to be retract is the last one added
    +          // the remove it and retract the value
    +          if(distinctValCounter == 1L){
    --- End diff --
    
    Check for `null` to be on the safe side?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3732#discussion_r112691071
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UnsupportedOperatorsIndicatorFunctions.scala ---
    @@ -0,0 +1,46 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.functions
    +
    +import java.nio.charset.Charset
    +import java.util.List
    +
    +import org.apache.calcite.rel.`type`._
    +import org.apache.calcite.sql._
    +import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes, SqlTypeFamily, SqlTypeName}
    +import org.apache.calcite.sql.validate.SqlMonotonicity
    +import org.apache.calcite.tools.RelBuilder
    +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
    +import org.apache.flink.table.api.TableException
    +import org.apache.flink.table.expressions.LeafExpression
    +import org.apache.calcite.sql.`type`.InferTypes
    +import org.apache.calcite.sql.validate.SqlValidator
    +import org.apache.calcite.sql.validate.SqlValidatorScope
    +
    +/**
    + * An SQL Function DISTINCT(<NUMERIC>) used to mark the DISTINCT operator
    + * on aggregation input. This is temporary workaround waiting for 
    + * https://issues.apache.org/jira/browse/CALCITE-1740 being solved
    + */
    +object DistinctAggregatorExtractor extends SqlFunction("DIST", SqlKind.OTHER_FUNCTION,
    +  ReturnTypes.ARG0, InferTypes.RETURN_TYPE,
    +  OperandTypes.NUMERIC, SqlFunctionCategory.NUMERIC) {
    --- End diff --
    
    An aggregation function can also return non-numeric types such as MIN(String). So `SqlFunctionCategory.NUMERIC` might not be the right category. OTOH, I don't know how Calcite uses this category.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3732#discussion_r112695257
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -186,11 +194,21 @@ object AggregateUtil {
           }
         } else {
           if (isRowsClause) {
    -        new ProcTimeBoundedRowsOver(
    -          genFunction,
    -          precedingOffset,
    -          aggregationStateType,
    -          inputRowType)
    +        if (hasDistinct){
    +          new ProcTimeBoundedDistinctRowsOver(aggregates,
    --- End diff --
    
    `aggregates` in new line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by stefanobortoli <gi...@git.apache.org>.
Github user stefanobortoli commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3732#discussion_r112713096
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala ---
    @@ -0,0 +1,230 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.util
    +
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{Collector, Preconditions}
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +import org.apache.flink.api.common.state.MapState
    +import org.apache.flink.api.common.state.MapStateDescriptor
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ListTypeInfo
    +import java.util.{List => JList}
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    +import org.apache.flink.api.common.state.ListState
    +
    +class ProcTimeBoundedDistinctRowsOver(
    +  private val aggregates: Array[AggregateFunction[_]],
    +  private val aggFields: Array[Array[Int]],
    +  private val distinctAggsFlag: Array[Boolean],
    +  private val precedingOffset: Long,
    +  private val forwardedFieldCount: Int,
    +  private val aggregatesTypeInfo: RowTypeInfo,
    +  private val inputType: TypeInformation[Row])
    +    extends ProcessFunction[Row, Row] {
    +
    +  Preconditions.checkNotNull(aggregates)
    +  Preconditions.checkNotNull(aggFields)
    +  Preconditions.checkNotNull(distinctAggsFlag)
    +  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
    +  Preconditions.checkArgument(aggregates.length == aggFields.length)
    +  Preconditions.checkArgument(precedingOffset > 0)
    +
    +  private var accumulatorState: ValueState[Row] = _
    +  private var rowMapState: MapState[Long, JList[Row]] = _
    +  private var output: Row = _
    +  private var counterState: ValueState[Long] = _
    +  private var smallestTsState: ValueState[Long] = _
    +  private var distinctValueStateList: Array[MapState[Any, Long]] = _
    +
    +  override def open(config: Configuration) {
    +
    +    output = new Row(forwardedFieldCount + aggregates.length)
    +    // We keep the elements received in a Map state keyed
    +    // by the ingestion time in the operator.
    +    // we also keep counter of processed elements
    +    // and timestamp of oldest element
    +    val rowListTypeInfo: TypeInformation[JList[Row]] =
    +      new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
    +
    +    val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
    +      new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
    +        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo)
    +    rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
    +
    +    val aggregationStateDescriptor: ValueStateDescriptor[Row] =
    +      new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
    +    accumulatorState = getRuntimeContext.getState(aggregationStateDescriptor)
    +
    +    val processedCountDescriptor : ValueStateDescriptor[Long] =
    +       new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
    +    counterState = getRuntimeContext.getState(processedCountDescriptor)
    +
    +    val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
    +       new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
    +    smallestTsState = getRuntimeContext.getState(smallestTimestampDescriptor)
    +    distinctValueStateList = new Array(aggregates.size)
    +    for(i <- 0 until aggregates.size){
    +      if(distinctAggsFlag(i)){
    +        val distinctValDescriptor =  new MapStateDescriptor[Any, Long](
    +                                             "distinctValuesBufferMapState" + i,
    +                                              classOf[Any],
    +                                              classOf[Long])
    +        distinctValueStateList(i)=getRuntimeContext.getMapState(distinctValDescriptor)
    +      }
    +    }
    +  }
    +
    +  override def processElement(
    +    input: Row,
    +    ctx: ProcessFunction[Row, Row]#Context,
    +    out: Collector[Row]): Unit = {
    +
    +    val currentTime = ctx.timerService.currentProcessingTime
    +    var i = 0
    +
    +    // initialize state for the processed element
    +    var accumulators = accumulatorState.value
    +    if (accumulators == null) {
    +      accumulators = new Row(aggregates.length)
    +      while (i < aggregates.length) {
    +        accumulators.setField(i, aggregates(i).createAccumulator())
    +        i += 1
    +      }
    +    }
    +
    +    // get smallest timestamp
    +    var smallestTs = smallestTsState.value
    +    if (smallestTs == 0L) {
    +      smallestTs = currentTime
    +      smallestTsState.update(smallestTs)
    +    }
    +    // get previous counter value
    +    var counter = counterState.value
    +
    +    if (counter == precedingOffset) {
    +      val retractList = rowMapState.get(smallestTs)
    +
    +      // get oldest element beyond buffer size
    +      // and if oldest element exist, retract value
    +      var removeCounter :Integer = 0
    +      var distinctCounter : Integer = 0
    +      var retractVal : Object = null
    +      i = 0
    +      while (i < aggregates.length) {
    +        val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
    +        retractVal = retractList.get(0).getField(aggFields(i)(0))
    +        if(distinctAggsFlag(i)){
    +          var distinctValCounter: Long = distinctValueStateList(i).get(retractVal)
    +          // if the value to be retract is the last one added
    +          // the remove it and retract the value
    +          if(distinctValCounter == 1L){
    --- End diff --
    
    if we are retracting, it cannot be null, right? it is a value that was added to the list before at least once. I get your point thou


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by huawei-flink <gi...@git.apache.org>.
Github user huawei-flink commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3732#discussion_r112180091
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala ---
    @@ -0,0 +1,238 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.util
    +
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{Collector, Preconditions}
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +import org.apache.flink.api.common.state.MapState
    +import org.apache.flink.api.common.state.MapStateDescriptor
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ListTypeInfo
    +import java.util.{List => JList}
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    +
    +class ProcTimeBoundedDistinctRowsOver(
    +  private val aggregates: Array[AggregateFunction[_]],
    +  private val aggFields: Array[Array[Int]],
    +  private val distinctAggsFlag: Array[Boolean],
    +  private val precedingOffset: Long,
    +  private val forwardedFieldCount: Int,
    +  private val aggregatesTypeInfo: RowTypeInfo,
    +  private val inputType: TypeInformation[Row])
    +    extends ProcessFunction[Row, Row] {
    +
    +  Preconditions.checkNotNull(aggregates)
    +  Preconditions.checkNotNull(aggFields)
    +  Preconditions.checkNotNull(distinctAggsFlag)
    +  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
    +  Preconditions.checkArgument(aggregates.length == aggFields.length)
    +  Preconditions.checkArgument(precedingOffset > 0)
    +
    +  private var accumulatorState: ValueState[Row] = _
    +  private var rowMapState: MapState[Long, JList[Row]] = _
    +  private var output: Row = _
    +  private var counterState: ValueState[Long] = _
    +  private var smallestTsState: ValueState[Long] = _
    +  private var distinctValueState: MapState[Any, Row] = _
    +
    +  override def open(config: Configuration) {
    +
    +    output = new Row(forwardedFieldCount + aggregates.length)
    +    // We keep the elements received in a Map state keyed
    +    // by the ingestion time in the operator.
    +    // we also keep counter of processed elements
    +    // and timestamp of oldest element
    +    val rowListTypeInfo: TypeInformation[JList[Row]] =
    +      new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
    +
    +    val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
    +      new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
    +        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo)
    +    rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
    +
    +    val aggregationStateDescriptor: ValueStateDescriptor[Row] =
    +      new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
    +    accumulatorState = getRuntimeContext.getState(aggregationStateDescriptor)
    +
    +    val processedCountDescriptor : ValueStateDescriptor[Long] =
    +       new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
    +    counterState = getRuntimeContext.getState(processedCountDescriptor)
    +
    +    val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
    +       new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
    +    smallestTsState = getRuntimeContext.getState(smallestTimestampDescriptor)
    +    
    +    val distinctValDescriptor : MapStateDescriptor[Any, Row] =
    +      new MapStateDescriptor[Any, Row]("distinctValuesBufferMapState", classOf[Any], classOf[Row])
    --- End diff --
    
    you are right


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by huawei-flink <gi...@git.apache.org>.
Github user huawei-flink commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3732#discussion_r112151646
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala ---
    @@ -0,0 +1,238 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.util
    +
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{Collector, Preconditions}
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +import org.apache.flink.api.common.state.MapState
    +import org.apache.flink.api.common.state.MapStateDescriptor
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ListTypeInfo
    +import java.util.{List => JList}
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    +
    +class ProcTimeBoundedDistinctRowsOver(
    +  private val aggregates: Array[AggregateFunction[_]],
    +  private val aggFields: Array[Array[Int]],
    +  private val distinctAggsFlag: Array[Boolean],
    +  private val precedingOffset: Long,
    +  private val forwardedFieldCount: Int,
    +  private val aggregatesTypeInfo: RowTypeInfo,
    +  private val inputType: TypeInformation[Row])
    +    extends ProcessFunction[Row, Row] {
    +
    +  Preconditions.checkNotNull(aggregates)
    +  Preconditions.checkNotNull(aggFields)
    +  Preconditions.checkNotNull(distinctAggsFlag)
    +  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
    +  Preconditions.checkArgument(aggregates.length == aggFields.length)
    +  Preconditions.checkArgument(precedingOffset > 0)
    +
    +  private var accumulatorState: ValueState[Row] = _
    +  private var rowMapState: MapState[Long, JList[Row]] = _
    +  private var output: Row = _
    +  private var counterState: ValueState[Long] = _
    +  private var smallestTsState: ValueState[Long] = _
    +  private var distinctValueState: MapState[Any, Row] = _
    +
    +  override def open(config: Configuration) {
    +
    +    output = new Row(forwardedFieldCount + aggregates.length)
    +    // We keep the elements received in a Map state keyed
    +    // by the ingestion time in the operator.
    +    // we also keep counter of processed elements
    +    // and timestamp of oldest element
    +    val rowListTypeInfo: TypeInformation[JList[Row]] =
    +      new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
    +
    +    val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
    +      new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
    +        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo)
    +    rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
    +
    +    val aggregationStateDescriptor: ValueStateDescriptor[Row] =
    +      new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
    +    accumulatorState = getRuntimeContext.getState(aggregationStateDescriptor)
    +
    +    val processedCountDescriptor : ValueStateDescriptor[Long] =
    +       new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
    +    counterState = getRuntimeContext.getState(processedCountDescriptor)
    +
    +    val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
    +       new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
    +    smallestTsState = getRuntimeContext.getState(smallestTimestampDescriptor)
    +    
    +    val distinctValDescriptor : MapStateDescriptor[Any, Row] =
    +      new MapStateDescriptor[Any, Row]("distinctValuesBufferMapState", classOf[Any], classOf[Row])
    +    distinctValueState = getRuntimeContext.getMapState(distinctValDescriptor)
    +  }
    +
    +  override def processElement(
    +    input: Row,
    +    ctx: ProcessFunction[Row, Row]#Context,
    +    out: Collector[Row]): Unit = {
    +
    +    val currentTime = ctx.timerService.currentProcessingTime
    +    var i = 0
    +
    +    // initialize state for the processed element
    +    var accumulators = accumulatorState.value
    +    if (accumulators == null) {
    +      accumulators = new Row(aggregates.length)
    +      while (i < aggregates.length) {
    +        accumulators.setField(i, aggregates(i).createAccumulator())
    +        i += 1
    +      }
    +    }
    +
    +    // get smallest timestamp
    +    var smallestTs = smallestTsState.value
    +    if (smallestTs == 0L) {
    +      smallestTs = currentTime
    +      smallestTsState.update(smallestTs)
    +    }
    +    // get previous counter value
    +    var counter = counterState.value
    +
    +    if (counter == precedingOffset) {
    +      val retractList = rowMapState.get(smallestTs)
    +
    +      // get oldest element beyond buffer size
    +      // and if oldest element exist, retract value
    +      var removeCounter :Integer = 0
    +      var distinctCounter : Integer = 0
    +      var retractVal : Object = null
    +      i = 0
    +      while (i < aggregates.length) {
    +        val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
    +        retractVal = retractList.get(0).getField(aggFields(i)(0))
    +        if(distinctAggsFlag(i)){
    +          distinctCounter += 1
    +          val counterRow = distinctValueState.get(retractVal)
    --- End diff --
    
    the map state takes Any as key, but I agree with you that I didn't test this aspect through


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by hongyuhong <gi...@git.apache.org>.
Github user hongyuhong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3732#discussion_r112105029
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala ---
    @@ -0,0 +1,238 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.util
    +
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{Collector, Preconditions}
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +import org.apache.flink.api.common.state.MapState
    +import org.apache.flink.api.common.state.MapStateDescriptor
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ListTypeInfo
    +import java.util.{List => JList}
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    +
    +class ProcTimeBoundedDistinctRowsOver(
    +  private val aggregates: Array[AggregateFunction[_]],
    +  private val aggFields: Array[Array[Int]],
    +  private val distinctAggsFlag: Array[Boolean],
    +  private val precedingOffset: Long,
    +  private val forwardedFieldCount: Int,
    +  private val aggregatesTypeInfo: RowTypeInfo,
    +  private val inputType: TypeInformation[Row])
    +    extends ProcessFunction[Row, Row] {
    +
    +  Preconditions.checkNotNull(aggregates)
    +  Preconditions.checkNotNull(aggFields)
    +  Preconditions.checkNotNull(distinctAggsFlag)
    +  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
    +  Preconditions.checkArgument(aggregates.length == aggFields.length)
    +  Preconditions.checkArgument(precedingOffset > 0)
    +
    +  private var accumulatorState: ValueState[Row] = _
    +  private var rowMapState: MapState[Long, JList[Row]] = _
    +  private var output: Row = _
    +  private var counterState: ValueState[Long] = _
    +  private var smallestTsState: ValueState[Long] = _
    +  private var distinctValueState: MapState[Any, Row] = _
    +
    +  override def open(config: Configuration) {
    +
    +    output = new Row(forwardedFieldCount + aggregates.length)
    +    // We keep the elements received in a Map state keyed
    +    // by the ingestion time in the operator.
    +    // we also keep counter of processed elements
    +    // and timestamp of oldest element
    +    val rowListTypeInfo: TypeInformation[JList[Row]] =
    +      new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
    +
    +    val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
    +      new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
    +        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo)
    +    rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
    +
    +    val aggregationStateDescriptor: ValueStateDescriptor[Row] =
    +      new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
    +    accumulatorState = getRuntimeContext.getState(aggregationStateDescriptor)
    +
    +    val processedCountDescriptor : ValueStateDescriptor[Long] =
    +       new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
    +    counterState = getRuntimeContext.getState(processedCountDescriptor)
    +
    +    val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
    +       new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
    +    smallestTsState = getRuntimeContext.getState(smallestTimestampDescriptor)
    +    
    +    val distinctValDescriptor : MapStateDescriptor[Any, Row] =
    +      new MapStateDescriptor[Any, Row]("distinctValuesBufferMapState", classOf[Any], classOf[Row])
    +    distinctValueState = getRuntimeContext.getMapState(distinctValDescriptor)
    +  }
    +
    +  override def processElement(
    +    input: Row,
    +    ctx: ProcessFunction[Row, Row]#Context,
    +    out: Collector[Row]): Unit = {
    +
    +    val currentTime = ctx.timerService.currentProcessingTime
    +    var i = 0
    +
    +    // initialize state for the processed element
    +    var accumulators = accumulatorState.value
    +    if (accumulators == null) {
    +      accumulators = new Row(aggregates.length)
    +      while (i < aggregates.length) {
    +        accumulators.setField(i, aggregates(i).createAccumulator())
    +        i += 1
    +      }
    +    }
    +
    +    // get smallest timestamp
    +    var smallestTs = smallestTsState.value
    +    if (smallestTs == 0L) {
    +      smallestTs = currentTime
    +      smallestTsState.update(smallestTs)
    +    }
    +    // get previous counter value
    +    var counter = counterState.value
    +
    +    if (counter == precedingOffset) {
    +      val retractList = rowMapState.get(smallestTs)
    +
    +      // get oldest element beyond buffer size
    +      // and if oldest element exist, retract value
    +      var removeCounter :Integer = 0
    +      var distinctCounter : Integer = 0
    +      var retractVal : Object = null
    +      i = 0
    +      while (i < aggregates.length) {
    +        val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
    +        retractVal = retractList.get(0).getField(aggFields(i)(0))
    --- End diff --
    
    Why use two-dimensional array\uff1f It seems enough to use one-dim to record the aggregate index.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3732#discussion_r112690943
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UnsupportedOperatorsIndicatorFunctions.scala ---
    @@ -0,0 +1,46 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.functions
    +
    +import java.nio.charset.Charset
    +import java.util.List
    +
    +import org.apache.calcite.rel.`type`._
    --- End diff --
    
    Many unused imports


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by huawei-flink <gi...@git.apache.org>.
Github user huawei-flink commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3732#discussion_r112210097
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala ---
    @@ -0,0 +1,238 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.util
    +
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{Collector, Preconditions}
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +import org.apache.flink.api.common.state.MapState
    +import org.apache.flink.api.common.state.MapStateDescriptor
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ListTypeInfo
    +import java.util.{List => JList}
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    +
    +class ProcTimeBoundedDistinctRowsOver(
    +  private val aggregates: Array[AggregateFunction[_]],
    +  private val aggFields: Array[Array[Int]],
    +  private val distinctAggsFlag: Array[Boolean],
    +  private val precedingOffset: Long,
    +  private val forwardedFieldCount: Int,
    +  private val aggregatesTypeInfo: RowTypeInfo,
    +  private val inputType: TypeInformation[Row])
    +    extends ProcessFunction[Row, Row] {
    +
    +  Preconditions.checkNotNull(aggregates)
    +  Preconditions.checkNotNull(aggFields)
    +  Preconditions.checkNotNull(distinctAggsFlag)
    +  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
    +  Preconditions.checkArgument(aggregates.length == aggFields.length)
    +  Preconditions.checkArgument(precedingOffset > 0)
    +
    +  private var accumulatorState: ValueState[Row] = _
    +  private var rowMapState: MapState[Long, JList[Row]] = _
    +  private var output: Row = _
    +  private var counterState: ValueState[Long] = _
    +  private var smallestTsState: ValueState[Long] = _
    +  private var distinctValueState: MapState[Any, Row] = _
    +
    +  override def open(config: Configuration) {
    +
    +    output = new Row(forwardedFieldCount + aggregates.length)
    +    // We keep the elements received in a Map state keyed
    +    // by the ingestion time in the operator.
    +    // we also keep counter of processed elements
    +    // and timestamp of oldest element
    +    val rowListTypeInfo: TypeInformation[JList[Row]] =
    +      new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
    +
    +    val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
    +      new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
    +        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo)
    +    rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
    +
    +    val aggregationStateDescriptor: ValueStateDescriptor[Row] =
    +      new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
    +    accumulatorState = getRuntimeContext.getState(aggregationStateDescriptor)
    +
    +    val processedCountDescriptor : ValueStateDescriptor[Long] =
    +       new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
    +    counterState = getRuntimeContext.getState(processedCountDescriptor)
    +
    +    val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
    +       new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
    +    smallestTsState = getRuntimeContext.getState(smallestTimestampDescriptor)
    +    
    +    val distinctValDescriptor : MapStateDescriptor[Any, Row] =
    +      new MapStateDescriptor[Any, Row]("distinctValuesBufferMapState", classOf[Any], classOf[Row])
    --- End diff --
    
    ok, I have bypassed the problem by using a different distinctMapState per aggregation [value, long]. I don't think  it is necessary to preserve the type in the serialization, as aggregation works with numbers, and these do not have problems with serialization. or am I missing something?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3732#discussion_r112701470
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala ---
    @@ -0,0 +1,230 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.util
    +
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{Collector, Preconditions}
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +import org.apache.flink.api.common.state.MapState
    +import org.apache.flink.api.common.state.MapStateDescriptor
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ListTypeInfo
    +import java.util.{List => JList}
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    +import org.apache.flink.api.common.state.ListState
    +
    +class ProcTimeBoundedDistinctRowsOver(
    +  private val aggregates: Array[AggregateFunction[_]],
    +  private val aggFields: Array[Array[Int]],
    +  private val distinctAggsFlag: Array[Boolean],
    +  private val precedingOffset: Long,
    +  private val forwardedFieldCount: Int,
    +  private val aggregatesTypeInfo: RowTypeInfo,
    +  private val inputType: TypeInformation[Row])
    +    extends ProcessFunction[Row, Row] {
    +
    +  Preconditions.checkNotNull(aggregates)
    +  Preconditions.checkNotNull(aggFields)
    +  Preconditions.checkNotNull(distinctAggsFlag)
    +  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
    +  Preconditions.checkArgument(aggregates.length == aggFields.length)
    +  Preconditions.checkArgument(precedingOffset > 0)
    +
    +  private var accumulatorState: ValueState[Row] = _
    +  private var rowMapState: MapState[Long, JList[Row]] = _
    +  private var output: Row = _
    +  private var counterState: ValueState[Long] = _
    +  private var smallestTsState: ValueState[Long] = _
    +  private var distinctValueStateList: Array[MapState[Any, Long]] = _
    +
    +  override def open(config: Configuration) {
    +
    +    output = new Row(forwardedFieldCount + aggregates.length)
    +    // We keep the elements received in a Map state keyed
    +    // by the ingestion time in the operator.
    +    // we also keep counter of processed elements
    +    // and timestamp of oldest element
    +    val rowListTypeInfo: TypeInformation[JList[Row]] =
    +      new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
    +
    +    val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
    +      new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
    +        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo)
    +    rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
    +
    +    val aggregationStateDescriptor: ValueStateDescriptor[Row] =
    +      new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
    +    accumulatorState = getRuntimeContext.getState(aggregationStateDescriptor)
    +
    +    val processedCountDescriptor : ValueStateDescriptor[Long] =
    +       new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
    +    counterState = getRuntimeContext.getState(processedCountDescriptor)
    +
    +    val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
    +       new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
    +    smallestTsState = getRuntimeContext.getState(smallestTimestampDescriptor)
    +    distinctValueStateList = new Array(aggregates.size)
    +    for(i <- 0 until aggregates.size){
    +      if(distinctAggsFlag(i)){
    +        val distinctValDescriptor =  new MapStateDescriptor[Any, Long](
    +                                             "distinctValuesBufferMapState" + i,
    +                                              classOf[Any],
    +                                              classOf[Long])
    +        distinctValueStateList(i)=getRuntimeContext.getMapState(distinctValDescriptor)
    --- End diff --
    
    +spaces


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by hongyuhong <gi...@git.apache.org>.
Github user hongyuhong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3732#discussion_r112105507
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala ---
    @@ -0,0 +1,238 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.util
    +
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{Collector, Preconditions}
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +import org.apache.flink.api.common.state.MapState
    +import org.apache.flink.api.common.state.MapStateDescriptor
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ListTypeInfo
    +import java.util.{List => JList}
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    +
    +class ProcTimeBoundedDistinctRowsOver(
    +  private val aggregates: Array[AggregateFunction[_]],
    +  private val aggFields: Array[Array[Int]],
    +  private val distinctAggsFlag: Array[Boolean],
    +  private val precedingOffset: Long,
    +  private val forwardedFieldCount: Int,
    +  private val aggregatesTypeInfo: RowTypeInfo,
    +  private val inputType: TypeInformation[Row])
    +    extends ProcessFunction[Row, Row] {
    +
    +  Preconditions.checkNotNull(aggregates)
    +  Preconditions.checkNotNull(aggFields)
    +  Preconditions.checkNotNull(distinctAggsFlag)
    +  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
    +  Preconditions.checkArgument(aggregates.length == aggFields.length)
    +  Preconditions.checkArgument(precedingOffset > 0)
    +
    +  private var accumulatorState: ValueState[Row] = _
    +  private var rowMapState: MapState[Long, JList[Row]] = _
    +  private var output: Row = _
    +  private var counterState: ValueState[Long] = _
    +  private var smallestTsState: ValueState[Long] = _
    +  private var distinctValueState: MapState[Any, Row] = _
    +
    +  override def open(config: Configuration) {
    +
    +    output = new Row(forwardedFieldCount + aggregates.length)
    +    // We keep the elements received in a Map state keyed
    +    // by the ingestion time in the operator.
    +    // we also keep counter of processed elements
    +    // and timestamp of oldest element
    +    val rowListTypeInfo: TypeInformation[JList[Row]] =
    +      new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
    +
    +    val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
    +      new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
    +        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo)
    +    rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
    +
    +    val aggregationStateDescriptor: ValueStateDescriptor[Row] =
    +      new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
    +    accumulatorState = getRuntimeContext.getState(aggregationStateDescriptor)
    +
    +    val processedCountDescriptor : ValueStateDescriptor[Long] =
    +       new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
    +    counterState = getRuntimeContext.getState(processedCountDescriptor)
    +
    +    val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
    +       new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
    +    smallestTsState = getRuntimeContext.getState(smallestTimestampDescriptor)
    +    
    +    val distinctValDescriptor : MapStateDescriptor[Any, Row] =
    +      new MapStateDescriptor[Any, Row]("distinctValuesBufferMapState", classOf[Any], classOf[Row])
    +    distinctValueState = getRuntimeContext.getMapState(distinctValDescriptor)
    +  }
    +
    +  override def processElement(
    +    input: Row,
    +    ctx: ProcessFunction[Row, Row]#Context,
    +    out: Collector[Row]): Unit = {
    +
    +    val currentTime = ctx.timerService.currentProcessingTime
    +    var i = 0
    +
    +    // initialize state for the processed element
    +    var accumulators = accumulatorState.value
    +    if (accumulators == null) {
    +      accumulators = new Row(aggregates.length)
    +      while (i < aggregates.length) {
    +        accumulators.setField(i, aggregates(i).createAccumulator())
    +        i += 1
    +      }
    +    }
    +
    +    // get smallest timestamp
    +    var smallestTs = smallestTsState.value
    +    if (smallestTs == 0L) {
    +      smallestTs = currentTime
    +      smallestTsState.update(smallestTs)
    +    }
    +    // get previous counter value
    +    var counter = counterState.value
    +
    +    if (counter == precedingOffset) {
    +      val retractList = rowMapState.get(smallestTs)
    +
    +      // get oldest element beyond buffer size
    +      // and if oldest element exist, retract value
    +      var removeCounter :Integer = 0
    +      var distinctCounter : Integer = 0
    +      var retractVal : Object = null
    +      i = 0
    +      while (i < aggregates.length) {
    +        val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
    +        retractVal = retractList.get(0).getField(aggFields(i)(0))
    +        if(distinctAggsFlag(i)){
    +          distinctCounter += 1
    +          val counterRow = distinctValueState.get(retractVal)
    --- End diff --
    
    Different aggregate param has different type, i think we can not use one mapstate to store like e1,e2 in sum(dist e1), sum(dist e2)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3732: [FLINK-6250] Distinct procTime with Rows boundaries

Posted by rtudoran <gi...@git.apache.org>.
Github user rtudoran commented on the issue:

    https://github.com/apache/flink/pull/3732
  
    @fhueske @stefanobortoli  I suggest we merge this temporary solution into flink (with using a special marker for distinct) until the flink module will be upgraded to the next calcite release. I have fixed the issue into calcite. 
    However, the advantages of pushing already this is that:
    1) we can reuse the code
    2) when we have the distinct marker we can simply modify the check for distinct for the aggregates in the DataStreamOver 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3732#discussion_r112708098
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala ---
    @@ -0,0 +1,230 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.util
    +
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{Collector, Preconditions}
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +import org.apache.flink.api.common.state.MapState
    +import org.apache.flink.api.common.state.MapStateDescriptor
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ListTypeInfo
    +import java.util.{List => JList}
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    +import org.apache.flink.api.common.state.ListState
    +
    +class ProcTimeBoundedDistinctRowsOver(
    +  private val aggregates: Array[AggregateFunction[_]],
    +  private val aggFields: Array[Array[Int]],
    +  private val distinctAggsFlag: Array[Boolean],
    +  private val precedingOffset: Long,
    +  private val forwardedFieldCount: Int,
    +  private val aggregatesTypeInfo: RowTypeInfo,
    +  private val inputType: TypeInformation[Row])
    +    extends ProcessFunction[Row, Row] {
    +
    +  Preconditions.checkNotNull(aggregates)
    +  Preconditions.checkNotNull(aggFields)
    +  Preconditions.checkNotNull(distinctAggsFlag)
    +  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
    +  Preconditions.checkArgument(aggregates.length == aggFields.length)
    +  Preconditions.checkArgument(precedingOffset > 0)
    +
    +  private var accumulatorState: ValueState[Row] = _
    +  private var rowMapState: MapState[Long, JList[Row]] = _
    +  private var output: Row = _
    +  private var counterState: ValueState[Long] = _
    +  private var smallestTsState: ValueState[Long] = _
    +  private var distinctValueStateList: Array[MapState[Any, Long]] = _
    +
    +  override def open(config: Configuration) {
    +
    +    output = new Row(forwardedFieldCount + aggregates.length)
    +    // We keep the elements received in a Map state keyed
    +    // by the ingestion time in the operator.
    +    // we also keep counter of processed elements
    +    // and timestamp of oldest element
    +    val rowListTypeInfo: TypeInformation[JList[Row]] =
    +      new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
    +
    +    val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
    +      new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
    +        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo)
    +    rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
    +
    +    val aggregationStateDescriptor: ValueStateDescriptor[Row] =
    +      new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
    +    accumulatorState = getRuntimeContext.getState(aggregationStateDescriptor)
    +
    +    val processedCountDescriptor : ValueStateDescriptor[Long] =
    +       new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
    +    counterState = getRuntimeContext.getState(processedCountDescriptor)
    +
    +    val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
    +       new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
    +    smallestTsState = getRuntimeContext.getState(smallestTimestampDescriptor)
    +    distinctValueStateList = new Array(aggregates.size)
    +    for(i <- 0 until aggregates.size){
    +      if(distinctAggsFlag(i)){
    +        val distinctValDescriptor =  new MapStateDescriptor[Any, Long](
    +                                             "distinctValuesBufferMapState" + i,
    +                                              classOf[Any],
    +                                              classOf[Long])
    +        distinctValueStateList(i)=getRuntimeContext.getMapState(distinctValDescriptor)
    +      }
    +    }
    +  }
    +
    +  override def processElement(
    +    input: Row,
    +    ctx: ProcessFunction[Row, Row]#Context,
    +    out: Collector[Row]): Unit = {
    +
    +    val currentTime = ctx.timerService.currentProcessingTime
    +    var i = 0
    +
    +    // initialize state for the processed element
    +    var accumulators = accumulatorState.value
    +    if (accumulators == null) {
    +      accumulators = new Row(aggregates.length)
    +      while (i < aggregates.length) {
    +        accumulators.setField(i, aggregates(i).createAccumulator())
    +        i += 1
    +      }
    +    }
    +
    +    // get smallest timestamp
    +    var smallestTs = smallestTsState.value
    +    if (smallestTs == 0L) {
    +      smallestTs = currentTime
    +      smallestTsState.update(smallestTs)
    +    }
    +    // get previous counter value
    +    var counter = counterState.value
    +
    +    if (counter == precedingOffset) {
    +      val retractList = rowMapState.get(smallestTs)
    +
    +      // get oldest element beyond buffer size
    +      // and if oldest element exist, retract value
    +      var removeCounter :Integer = 0
    +      var distinctCounter : Integer = 0
    +      var retractVal : Object = null
    +      i = 0
    +      while (i < aggregates.length) {
    +        val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
    +        retractVal = retractList.get(0).getField(aggFields(i)(0))
    +        if(distinctAggsFlag(i)){
    +          var distinctValCounter: Long = distinctValueStateList(i).get(retractVal)
    +          // if the value to be retract is the last one added
    +          // the remove it and retract the value
    +          if(distinctValCounter == 1L){
    +            aggregates(i).retract(accumulator, retractVal)
    +            distinctValueStateList(i).remove(retractVal)
    +          } // if the are other values in the buffer 
    +            // decrease the counter and continue
    +          else {
    +            distinctValCounter -= 1
    +            distinctValueStateList(i).put(retractVal,distinctValCounter)
    +          }
    +        }else {
    +          aggregates(i).retract(accumulator, retractVal)
    +        }
    +        i += 1
    +      }
    +      retractList.remove(0)
    +      // if reference timestamp list not empty, keep the list
    +      if (!retractList.isEmpty) {
    +        rowMapState.put(smallestTs, retractList)
    +      } // if smallest timestamp list is empty, remove and find new smallest
    +      else {
    +        rowMapState.remove(smallestTs)
    +        val iter = rowMapState.keys.iterator
    +        var currentTs: Long = 0L
    +        var newSmallestTs: Long = Long.MaxValue
    +        while (iter.hasNext) {
    +          currentTs = iter.next
    +          if (currentTs < newSmallestTs) {
    +            newSmallestTs = currentTs
    +          }
    +        }
    +        smallestTsState.update(newSmallestTs)
    +      }
    +    } // we update the counter only while buffer is getting filled
    +    else {
    +      counter += 1
    +      counterState.update(counter)
    +    }
    +
    +    // copy forwarded fields in output row
    +    i = 0
    +    while (i < forwardedFieldCount) {
    +      output.setField(i, input.getField(i))
    +      i += 1
    +    }
    +
    +    // accumulate current row and set aggregate in output row
    +    i = 0
    +    while (i < aggregates.length) {
    +      val index = forwardedFieldCount + i
    +      val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
    +      val inputValue = input.getField(aggFields(i)(0))
    +      // check if distinct aggregation
    +      if(distinctAggsFlag(i)){
    +        // if first time we see value, set counter and aggregate
    +          var distinctValCounter: Long = distinctValueStateList(i).get(inputValue)
    +          // if counter is 0L first time we aggregate
    +          // for a seen value but never accumulated
    +          if(distinctValCounter == 0L){
    --- End diff --
    
    doesn't `MapState.get` return `null` when the key is not contained?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3732#discussion_r112706609
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala ---
    @@ -0,0 +1,230 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.util
    +
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{Collector, Preconditions}
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +import org.apache.flink.api.common.state.MapState
    +import org.apache.flink.api.common.state.MapStateDescriptor
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ListTypeInfo
    +import java.util.{List => JList}
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    +import org.apache.flink.api.common.state.ListState
    +
    +class ProcTimeBoundedDistinctRowsOver(
    +  private val aggregates: Array[AggregateFunction[_]],
    +  private val aggFields: Array[Array[Int]],
    +  private val distinctAggsFlag: Array[Boolean],
    +  private val precedingOffset: Long,
    +  private val forwardedFieldCount: Int,
    +  private val aggregatesTypeInfo: RowTypeInfo,
    +  private val inputType: TypeInformation[Row])
    +    extends ProcessFunction[Row, Row] {
    +
    +  Preconditions.checkNotNull(aggregates)
    +  Preconditions.checkNotNull(aggFields)
    +  Preconditions.checkNotNull(distinctAggsFlag)
    +  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
    +  Preconditions.checkArgument(aggregates.length == aggFields.length)
    +  Preconditions.checkArgument(precedingOffset > 0)
    +
    +  private var accumulatorState: ValueState[Row] = _
    +  private var rowMapState: MapState[Long, JList[Row]] = _
    +  private var output: Row = _
    +  private var counterState: ValueState[Long] = _
    +  private var smallestTsState: ValueState[Long] = _
    +  private var distinctValueStateList: Array[MapState[Any, Long]] = _
    +
    +  override def open(config: Configuration) {
    +
    +    output = new Row(forwardedFieldCount + aggregates.length)
    +    // We keep the elements received in a Map state keyed
    +    // by the ingestion time in the operator.
    +    // we also keep counter of processed elements
    +    // and timestamp of oldest element
    +    val rowListTypeInfo: TypeInformation[JList[Row]] =
    +      new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
    +
    +    val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
    +      new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
    +        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo)
    +    rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
    +
    +    val aggregationStateDescriptor: ValueStateDescriptor[Row] =
    +      new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
    +    accumulatorState = getRuntimeContext.getState(aggregationStateDescriptor)
    +
    +    val processedCountDescriptor : ValueStateDescriptor[Long] =
    +       new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
    +    counterState = getRuntimeContext.getState(processedCountDescriptor)
    +
    +    val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
    +       new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
    +    smallestTsState = getRuntimeContext.getState(smallestTimestampDescriptor)
    +    distinctValueStateList = new Array(aggregates.size)
    +    for(i <- 0 until aggregates.size){
    +      if(distinctAggsFlag(i)){
    +        val distinctValDescriptor =  new MapStateDescriptor[Any, Long](
    +                                             "distinctValuesBufferMapState" + i,
    +                                              classOf[Any],
    +                                              classOf[Long])
    +        distinctValueStateList(i)=getRuntimeContext.getMapState(distinctValDescriptor)
    +      }
    +    }
    +  }
    +
    +  override def processElement(
    +    input: Row,
    +    ctx: ProcessFunction[Row, Row]#Context,
    +    out: Collector[Row]): Unit = {
    +
    +    val currentTime = ctx.timerService.currentProcessingTime
    +    var i = 0
    +
    +    // initialize state for the processed element
    +    var accumulators = accumulatorState.value
    +    if (accumulators == null) {
    +      accumulators = new Row(aggregates.length)
    +      while (i < aggregates.length) {
    +        accumulators.setField(i, aggregates(i).createAccumulator())
    +        i += 1
    +      }
    +    }
    +
    +    // get smallest timestamp
    +    var smallestTs = smallestTsState.value
    +    if (smallestTs == 0L) {
    +      smallestTs = currentTime
    +      smallestTsState.update(smallestTs)
    +    }
    +    // get previous counter value
    +    var counter = counterState.value
    +
    +    if (counter == precedingOffset) {
    +      val retractList = rowMapState.get(smallestTs)
    +
    +      // get oldest element beyond buffer size
    +      // and if oldest element exist, retract value
    +      var removeCounter :Integer = 0
    +      var distinctCounter : Integer = 0
    --- End diff --
    
    not used


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by huawei-flink <gi...@git.apache.org>.
Github user huawei-flink closed the pull request at:

    https://github.com/apache/flink/pull/3732


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3732: [FLINK-6250] Distinct procTime with Rows boundaries

Posted by stefanobortoli <gi...@git.apache.org>.
Github user stefanobortoli commented on the issue:

    https://github.com/apache/flink/pull/3732
  
    @fhueske I have just pushed a version working with code generation (without modifying the code generation) There will be the need for some refactoring in the AggregateUtil function, but if the overall concept is sound, I will fix things. 
    
    @hongyuhong , @shijinkui you could also have a look if you have time. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3732: [FLINK-6250] Distinct procTime with Rows boundaries

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3732
  
    Thanks @stefanobortoli! I'll have a look at #3771


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by stefanobortoli <gi...@git.apache.org>.
Github user stefanobortoli commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3732#discussion_r112712717
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala ---
    @@ -0,0 +1,230 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.util
    +
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{Collector, Preconditions}
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +import org.apache.flink.api.common.state.MapState
    +import org.apache.flink.api.common.state.MapStateDescriptor
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ListTypeInfo
    +import java.util.{List => JList}
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    +import org.apache.flink.api.common.state.ListState
    +
    +class ProcTimeBoundedDistinctRowsOver(
    +  private val aggregates: Array[AggregateFunction[_]],
    +  private val aggFields: Array[Array[Int]],
    +  private val distinctAggsFlag: Array[Boolean],
    +  private val precedingOffset: Long,
    +  private val forwardedFieldCount: Int,
    +  private val aggregatesTypeInfo: RowTypeInfo,
    +  private val inputType: TypeInformation[Row])
    +    extends ProcessFunction[Row, Row] {
    +
    +  Preconditions.checkNotNull(aggregates)
    +  Preconditions.checkNotNull(aggFields)
    +  Preconditions.checkNotNull(distinctAggsFlag)
    +  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
    +  Preconditions.checkArgument(aggregates.length == aggFields.length)
    +  Preconditions.checkArgument(precedingOffset > 0)
    +
    +  private var accumulatorState: ValueState[Row] = _
    +  private var rowMapState: MapState[Long, JList[Row]] = _
    +  private var output: Row = _
    +  private var counterState: ValueState[Long] = _
    +  private var smallestTsState: ValueState[Long] = _
    +  private var distinctValueStateList: Array[MapState[Any, Long]] = _
    +
    +  override def open(config: Configuration) {
    +
    +    output = new Row(forwardedFieldCount + aggregates.length)
    +    // We keep the elements received in a Map state keyed
    +    // by the ingestion time in the operator.
    +    // we also keep counter of processed elements
    +    // and timestamp of oldest element
    +    val rowListTypeInfo: TypeInformation[JList[Row]] =
    +      new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
    +
    +    val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
    +      new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
    +        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo)
    +    rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
    +
    +    val aggregationStateDescriptor: ValueStateDescriptor[Row] =
    +      new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
    +    accumulatorState = getRuntimeContext.getState(aggregationStateDescriptor)
    +
    +    val processedCountDescriptor : ValueStateDescriptor[Long] =
    +       new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
    +    counterState = getRuntimeContext.getState(processedCountDescriptor)
    +
    +    val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
    +       new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
    +    smallestTsState = getRuntimeContext.getState(smallestTimestampDescriptor)
    +    distinctValueStateList = new Array(aggregates.size)
    +    for(i <- 0 until aggregates.size){
    +      if(distinctAggsFlag(i)){
    +        val distinctValDescriptor =  new MapStateDescriptor[Any, Long](
    +                                             "distinctValuesBufferMapState" + i,
    +                                              classOf[Any],
    +                                              classOf[Long])
    +        distinctValueStateList(i)=getRuntimeContext.getMapState(distinctValDescriptor)
    +      }
    +    }
    +  }
    +
    +  override def processElement(
    +    input: Row,
    +    ctx: ProcessFunction[Row, Row]#Context,
    +    out: Collector[Row]): Unit = {
    +
    +    val currentTime = ctx.timerService.currentProcessingTime
    +    var i = 0
    +
    +    // initialize state for the processed element
    +    var accumulators = accumulatorState.value
    +    if (accumulators == null) {
    +      accumulators = new Row(aggregates.length)
    +      while (i < aggregates.length) {
    +        accumulators.setField(i, aggregates(i).createAccumulator())
    +        i += 1
    +      }
    +    }
    +
    +    // get smallest timestamp
    +    var smallestTs = smallestTsState.value
    +    if (smallestTs == 0L) {
    +      smallestTs = currentTime
    +      smallestTsState.update(smallestTs)
    +    }
    +    // get previous counter value
    +    var counter = counterState.value
    +
    +    if (counter == precedingOffset) {
    +      val retractList = rowMapState.get(smallestTs)
    +
    +      // get oldest element beyond buffer size
    +      // and if oldest element exist, retract value
    +      var removeCounter :Integer = 0
    +      var distinctCounter : Integer = 0
    +      var retractVal : Object = null
    +      i = 0
    +      while (i < aggregates.length) {
    +        val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
    +        retractVal = retractList.get(0).getField(aggFields(i)(0))
    +        if(distinctAggsFlag(i)){
    +          var distinctValCounter: Long = distinctValueStateList(i).get(retractVal)
    +          // if the value to be retract is the last one added
    +          // the remove it and retract the value
    +          if(distinctValCounter == 1L){
    +            aggregates(i).retract(accumulator, retractVal)
    +            distinctValueStateList(i).remove(retractVal)
    +          } // if the are other values in the buffer 
    +            // decrease the counter and continue
    +          else {
    +            distinctValCounter -= 1
    +            distinctValueStateList(i).put(retractVal,distinctValCounter)
    +          }
    +        }else {
    +          aggregates(i).retract(accumulator, retractVal)
    +        }
    +        i += 1
    +      }
    +      retractList.remove(0)
    +      // if reference timestamp list not empty, keep the list
    +      if (!retractList.isEmpty) {
    +        rowMapState.put(smallestTs, retractList)
    +      } // if smallest timestamp list is empty, remove and find new smallest
    +      else {
    +        rowMapState.remove(smallestTs)
    +        val iter = rowMapState.keys.iterator
    +        var currentTs: Long = 0L
    +        var newSmallestTs: Long = Long.MaxValue
    +        while (iter.hasNext) {
    +          currentTs = iter.next
    +          if (currentTs < newSmallestTs) {
    +            newSmallestTs = currentTs
    +          }
    +        }
    +        smallestTsState.update(newSmallestTs)
    +      }
    +    } // we update the counter only while buffer is getting filled
    +    else {
    +      counter += 1
    +      counterState.update(counter)
    +    }
    +
    +    // copy forwarded fields in output row
    +    i = 0
    +    while (i < forwardedFieldCount) {
    +      output.setField(i, input.getField(i))
    +      i += 1
    +    }
    +
    +    // accumulate current row and set aggregate in output row
    +    i = 0
    +    while (i < aggregates.length) {
    +      val index = forwardedFieldCount + i
    +      val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
    +      val inputValue = input.getField(aggFields(i)(0))
    +      // check if distinct aggregation
    +      if(distinctAggsFlag(i)){
    +        // if first time we see value, set counter and aggregate
    +          var distinctValCounter: Long = distinctValueStateList(i).get(inputValue)
    +          // if counter is 0L first time we aggregate
    +          // for a seen value but never accumulated
    +          if(distinctValCounter == 0L){
    --- End diff --
    
    if it is a long, it returns 0L, don't ask me why. :-D 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3732#discussion_r112693618
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---
    @@ -91,6 +93,22 @@ class DataStreamOverAggregate(
     
         val overWindow: org.apache.calcite.rel.core.Window.Group = logicWindow.groups.get(0)
     
    +    val distinctVarMap: Map[String,Boolean] = new HashMap[String, Boolean]
    --- End diff --
    
    I would do the extraction in the DataStreamOverAggregateRule. There we have proper access to the input `Calc` and the `RexProgram`. Extraction the function call as a String is quite fragile. The calc could for instance contain an attribute called "DISTRIBUTION".
    
    The rule would remove unnest the expression from the `DIST()` RexNode and remove `DIST`. The distinct information would need to be added to the DataStreamOverAggregate.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3732: [FLINK-6250] Distinct procTime with Rows boundaries

Posted by stefanobortoli <gi...@git.apache.org>.
Github user stefanobortoli commented on the issue:

    https://github.com/apache/flink/pull/3732
  
    @fhueske @sunjincheng121 @shijinkui @hongyuhong  I have created a PR with the latest master with the code generated distinct, #3771 please have a look. If we it is fine, we can basically support distinct for all the window types


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3732#discussion_r112706580
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala ---
    @@ -0,0 +1,230 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.util
    +
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{Collector, Preconditions}
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +import org.apache.flink.api.common.state.MapState
    +import org.apache.flink.api.common.state.MapStateDescriptor
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ListTypeInfo
    +import java.util.{List => JList}
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    +import org.apache.flink.api.common.state.ListState
    +
    +class ProcTimeBoundedDistinctRowsOver(
    +  private val aggregates: Array[AggregateFunction[_]],
    +  private val aggFields: Array[Array[Int]],
    +  private val distinctAggsFlag: Array[Boolean],
    +  private val precedingOffset: Long,
    +  private val forwardedFieldCount: Int,
    +  private val aggregatesTypeInfo: RowTypeInfo,
    +  private val inputType: TypeInformation[Row])
    +    extends ProcessFunction[Row, Row] {
    +
    +  Preconditions.checkNotNull(aggregates)
    +  Preconditions.checkNotNull(aggFields)
    +  Preconditions.checkNotNull(distinctAggsFlag)
    +  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
    +  Preconditions.checkArgument(aggregates.length == aggFields.length)
    +  Preconditions.checkArgument(precedingOffset > 0)
    +
    +  private var accumulatorState: ValueState[Row] = _
    +  private var rowMapState: MapState[Long, JList[Row]] = _
    +  private var output: Row = _
    +  private var counterState: ValueState[Long] = _
    +  private var smallestTsState: ValueState[Long] = _
    +  private var distinctValueStateList: Array[MapState[Any, Long]] = _
    +
    +  override def open(config: Configuration) {
    +
    +    output = new Row(forwardedFieldCount + aggregates.length)
    +    // We keep the elements received in a Map state keyed
    +    // by the ingestion time in the operator.
    +    // we also keep counter of processed elements
    +    // and timestamp of oldest element
    +    val rowListTypeInfo: TypeInformation[JList[Row]] =
    +      new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
    +
    +    val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
    +      new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
    +        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo)
    +    rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
    +
    +    val aggregationStateDescriptor: ValueStateDescriptor[Row] =
    +      new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
    +    accumulatorState = getRuntimeContext.getState(aggregationStateDescriptor)
    +
    +    val processedCountDescriptor : ValueStateDescriptor[Long] =
    +       new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
    +    counterState = getRuntimeContext.getState(processedCountDescriptor)
    +
    +    val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
    +       new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
    +    smallestTsState = getRuntimeContext.getState(smallestTimestampDescriptor)
    +    distinctValueStateList = new Array(aggregates.size)
    +    for(i <- 0 until aggregates.size){
    +      if(distinctAggsFlag(i)){
    +        val distinctValDescriptor =  new MapStateDescriptor[Any, Long](
    +                                             "distinctValuesBufferMapState" + i,
    +                                              classOf[Any],
    +                                              classOf[Long])
    +        distinctValueStateList(i)=getRuntimeContext.getMapState(distinctValDescriptor)
    +      }
    +    }
    +  }
    +
    +  override def processElement(
    +    input: Row,
    +    ctx: ProcessFunction[Row, Row]#Context,
    +    out: Collector[Row]): Unit = {
    +
    +    val currentTime = ctx.timerService.currentProcessingTime
    +    var i = 0
    +
    +    // initialize state for the processed element
    +    var accumulators = accumulatorState.value
    +    if (accumulators == null) {
    +      accumulators = new Row(aggregates.length)
    +      while (i < aggregates.length) {
    +        accumulators.setField(i, aggregates(i).createAccumulator())
    +        i += 1
    +      }
    +    }
    +
    +    // get smallest timestamp
    +    var smallestTs = smallestTsState.value
    +    if (smallestTs == 0L) {
    +      smallestTs = currentTime
    +      smallestTsState.update(smallestTs)
    +    }
    +    // get previous counter value
    +    var counter = counterState.value
    +
    +    if (counter == precedingOffset) {
    +      val retractList = rowMapState.get(smallestTs)
    +
    +      // get oldest element beyond buffer size
    +      // and if oldest element exist, retract value
    +      var removeCounter :Integer = 0
    --- End diff --
    
    not used


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3732: [FLINK-6250] Distinct procTime with Rows boundaries

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3732
  
    sounds good to me. IMO, we can also add the runtime code to the code base even if there is no API support if we cover it with test cases. Then we could quickly enable it once DISTINCT in OVER becomes available.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3732: [FLINK-6250] Distinct procTime with Rows boundaries

Posted by stefanobortoli <gi...@git.apache.org>.
Github user stefanobortoli commented on the issue:

    https://github.com/apache/flink/pull/3732
  
    @fhueske, I agree with you about the risk of temporary DIST() ingestion. Perhaps we could meanwhile just work on the "ProcessFunction +  Code generation" keeping the DIST function for test purposes tests. My concern is that the code my change again and all the work would just be wasted. To be honest, the code generation is quite new to me, and I will have to learn to work on that. Meanwhile, I have almost completed a version that relies on current code generation, nesting the distinct logic. As it is almost done, I will share this one as well and then if necessary move to the code generation. what do you think? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3732#discussion_r112702342
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala ---
    @@ -0,0 +1,230 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.util
    +
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{Collector, Preconditions}
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +import org.apache.flink.api.common.state.MapState
    +import org.apache.flink.api.common.state.MapStateDescriptor
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ListTypeInfo
    +import java.util.{List => JList}
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    +import org.apache.flink.api.common.state.ListState
    +
    +class ProcTimeBoundedDistinctRowsOver(
    +  private val aggregates: Array[AggregateFunction[_]],
    +  private val aggFields: Array[Array[Int]],
    +  private val distinctAggsFlag: Array[Boolean],
    +  private val precedingOffset: Long,
    +  private val forwardedFieldCount: Int,
    +  private val aggregatesTypeInfo: RowTypeInfo,
    +  private val inputType: TypeInformation[Row])
    +    extends ProcessFunction[Row, Row] {
    +
    +  Preconditions.checkNotNull(aggregates)
    +  Preconditions.checkNotNull(aggFields)
    +  Preconditions.checkNotNull(distinctAggsFlag)
    +  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
    +  Preconditions.checkArgument(aggregates.length == aggFields.length)
    +  Preconditions.checkArgument(precedingOffset > 0)
    +
    +  private var accumulatorState: ValueState[Row] = _
    +  private var rowMapState: MapState[Long, JList[Row]] = _
    +  private var output: Row = _
    +  private var counterState: ValueState[Long] = _
    +  private var smallestTsState: ValueState[Long] = _
    +  private var distinctValueStateList: Array[MapState[Any, Long]] = _
    +
    +  override def open(config: Configuration) {
    +
    +    output = new Row(forwardedFieldCount + aggregates.length)
    +    // We keep the elements received in a Map state keyed
    +    // by the ingestion time in the operator.
    +    // we also keep counter of processed elements
    +    // and timestamp of oldest element
    +    val rowListTypeInfo: TypeInformation[JList[Row]] =
    +      new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
    +
    +    val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
    +      new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
    +        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo)
    +    rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
    +
    +    val aggregationStateDescriptor: ValueStateDescriptor[Row] =
    +      new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
    +    accumulatorState = getRuntimeContext.getState(aggregationStateDescriptor)
    +
    +    val processedCountDescriptor : ValueStateDescriptor[Long] =
    +       new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
    +    counterState = getRuntimeContext.getState(processedCountDescriptor)
    +
    +    val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
    +       new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
    +    smallestTsState = getRuntimeContext.getState(smallestTimestampDescriptor)
    +    distinctValueStateList = new Array(aggregates.size)
    +    for(i <- 0 until aggregates.size){
    +      if(distinctAggsFlag(i)){
    +        val distinctValDescriptor =  new MapStateDescriptor[Any, Long](
    +                                             "distinctValuesBufferMapState" + i,
    +                                              classOf[Any],
    +                                              classOf[Long])
    +        distinctValueStateList(i)=getRuntimeContext.getMapState(distinctValDescriptor)
    +      }
    +    }
    +  }
    +
    +  override def processElement(
    +    input: Row,
    +    ctx: ProcessFunction[Row, Row]#Context,
    +    out: Collector[Row]): Unit = {
    +
    +    val currentTime = ctx.timerService.currentProcessingTime
    +    var i = 0
    +
    +    // initialize state for the processed element
    +    var accumulators = accumulatorState.value
    +    if (accumulators == null) {
    +      accumulators = new Row(aggregates.length)
    +      while (i < aggregates.length) {
    +        accumulators.setField(i, aggregates(i).createAccumulator())
    +        i += 1
    +      }
    +    }
    +
    +    // get smallest timestamp
    +    var smallestTs = smallestTsState.value
    +    if (smallestTs == 0L) {
    +      smallestTs = currentTime
    +      smallestTsState.update(smallestTs)
    +    }
    +    // get previous counter value
    +    var counter = counterState.value
    +
    +    if (counter == precedingOffset) {
    +      val retractList = rowMapState.get(smallestTs)
    +
    +      // get oldest element beyond buffer size
    +      // and if oldest element exist, retract value
    +      var removeCounter :Integer = 0
    +      var distinctCounter : Integer = 0
    +      var retractVal : Object = null
    +      i = 0
    +      while (i < aggregates.length) {
    +        val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
    +        retractVal = retractList.get(0).getField(aggFields(i)(0))
    +        if(distinctAggsFlag(i)){
    +          var distinctValCounter: Long = distinctValueStateList(i).get(retractVal)
    +          // if the value to be retract is the last one added
    +          // the remove it and retract the value
    +          if(distinctValCounter == 1L){
    +            aggregates(i).retract(accumulator, retractVal)
    +            distinctValueStateList(i).remove(retractVal)
    +          } // if the are other values in the buffer 
    +            // decrease the counter and continue
    +          else {
    +            distinctValCounter -= 1
    +            distinctValueStateList(i).put(retractVal,distinctValCounter)
    +          }
    +        }else {
    --- End diff --
    
    +space


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by stefanobortoli <gi...@git.apache.org>.
Github user stefanobortoli commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3732#discussion_r112725820
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---
    @@ -91,6 +93,22 @@ class DataStreamOverAggregate(
     
         val overWindow: org.apache.calcite.rel.core.Window.Group = logicWindow.groups.get(0)
     
    +    val distinctVarMap: Map[String,Boolean] = new HashMap[String, Boolean]
    --- End diff --
    
    This is a good point. The string trick is anyway a temporary workaround.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3732#discussion_r112694144
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---
    @@ -224,13 +244,21 @@ class DataStreamOverAggregate(
       def createBoundedAndCurrentRowOverWindow(
         generator: CodeGenerator,
         inputDS: DataStream[Row],
    +    distinctVarMap : Map[String,Boolean],
         isRowTimeType: Boolean,
         isRowsClause: Boolean): DataStream[Row] = {
     
         val overWindow: Group = logicWindow.groups.get(0)
         val partitionKeys: Array[Int] = overWindow.keys.toArray
         val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates
     
    +    val aggregateCalls = overWindow.getAggregateCalls(logicWindow)
    +    val distinctAggFlags: Array[Boolean] =  new Array[Boolean](aggregateCalls.size)
    +    for (i <- 0 until aggregateCalls.size()){
    --- End diff --
    
    +space `.size()) {`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by hongyuhong <gi...@git.apache.org>.
Github user hongyuhong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3732#discussion_r112103318
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala ---
    @@ -0,0 +1,238 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.util
    +
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{Collector, Preconditions}
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +import org.apache.flink.api.common.state.MapState
    +import org.apache.flink.api.common.state.MapStateDescriptor
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ListTypeInfo
    +import java.util.{List => JList}
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    +
    +class ProcTimeBoundedDistinctRowsOver(
    +  private val aggregates: Array[AggregateFunction[_]],
    +  private val aggFields: Array[Array[Int]],
    +  private val distinctAggsFlag: Array[Boolean],
    +  private val precedingOffset: Long,
    +  private val forwardedFieldCount: Int,
    +  private val aggregatesTypeInfo: RowTypeInfo,
    +  private val inputType: TypeInformation[Row])
    +    extends ProcessFunction[Row, Row] {
    +
    +  Preconditions.checkNotNull(aggregates)
    +  Preconditions.checkNotNull(aggFields)
    +  Preconditions.checkNotNull(distinctAggsFlag)
    +  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
    +  Preconditions.checkArgument(aggregates.length == aggFields.length)
    +  Preconditions.checkArgument(precedingOffset > 0)
    +
    +  private var accumulatorState: ValueState[Row] = _
    +  private var rowMapState: MapState[Long, JList[Row]] = _
    +  private var output: Row = _
    +  private var counterState: ValueState[Long] = _
    +  private var smallestTsState: ValueState[Long] = _
    +  private var distinctValueState: MapState[Any, Row] = _
    +
    +  override def open(config: Configuration) {
    +
    +    output = new Row(forwardedFieldCount + aggregates.length)
    +    // We keep the elements received in a Map state keyed
    +    // by the ingestion time in the operator.
    +    // we also keep counter of processed elements
    +    // and timestamp of oldest element
    +    val rowListTypeInfo: TypeInformation[JList[Row]] =
    +      new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
    +
    +    val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
    +      new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
    +        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo)
    +    rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
    +
    +    val aggregationStateDescriptor: ValueStateDescriptor[Row] =
    +      new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
    +    accumulatorState = getRuntimeContext.getState(aggregationStateDescriptor)
    +
    +    val processedCountDescriptor : ValueStateDescriptor[Long] =
    +       new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
    +    counterState = getRuntimeContext.getState(processedCountDescriptor)
    +
    +    val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
    +       new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
    +    smallestTsState = getRuntimeContext.getState(smallestTimestampDescriptor)
    +    
    +    val distinctValDescriptor : MapStateDescriptor[Any, Row] =
    +      new MapStateDescriptor[Any, Row]("distinctValuesBufferMapState", classOf[Any], classOf[Row])
    --- End diff --
    
    It seems we should use detail Key type and Row type, otherwise it can not be serialize.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3732: [FLINK-6250] Distinct procTime with Rows boundaries

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3732
  
    Btw, the code generation is not so fancy. The best way to learn it would be to debug a simple batch GROUP BY query (once batch aggregations are code-gen'd) as well


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by huawei-flink <gi...@git.apache.org>.
Github user huawei-flink commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3732#discussion_r112210389
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala ---
    @@ -0,0 +1,238 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.util
    +
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{Collector, Preconditions}
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +import org.apache.flink.api.common.state.MapState
    +import org.apache.flink.api.common.state.MapStateDescriptor
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ListTypeInfo
    +import java.util.{List => JList}
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    +
    +class ProcTimeBoundedDistinctRowsOver(
    +  private val aggregates: Array[AggregateFunction[_]],
    +  private val aggFields: Array[Array[Int]],
    +  private val distinctAggsFlag: Array[Boolean],
    +  private val precedingOffset: Long,
    +  private val forwardedFieldCount: Int,
    +  private val aggregatesTypeInfo: RowTypeInfo,
    +  private val inputType: TypeInformation[Row])
    +    extends ProcessFunction[Row, Row] {
    +
    +  Preconditions.checkNotNull(aggregates)
    +  Preconditions.checkNotNull(aggFields)
    +  Preconditions.checkNotNull(distinctAggsFlag)
    +  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
    +  Preconditions.checkArgument(aggregates.length == aggFields.length)
    +  Preconditions.checkArgument(precedingOffset > 0)
    +
    +  private var accumulatorState: ValueState[Row] = _
    +  private var rowMapState: MapState[Long, JList[Row]] = _
    +  private var output: Row = _
    +  private var counterState: ValueState[Long] = _
    +  private var smallestTsState: ValueState[Long] = _
    +  private var distinctValueState: MapState[Any, Row] = _
    +
    +  override def open(config: Configuration) {
    +
    +    output = new Row(forwardedFieldCount + aggregates.length)
    +    // We keep the elements received in a Map state keyed
    +    // by the ingestion time in the operator.
    +    // we also keep counter of processed elements
    +    // and timestamp of oldest element
    +    val rowListTypeInfo: TypeInformation[JList[Row]] =
    +      new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
    +
    +    val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
    +      new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
    +        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo)
    +    rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
    +
    +    val aggregationStateDescriptor: ValueStateDescriptor[Row] =
    +      new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
    +    accumulatorState = getRuntimeContext.getState(aggregationStateDescriptor)
    +
    +    val processedCountDescriptor : ValueStateDescriptor[Long] =
    +       new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
    +    counterState = getRuntimeContext.getState(processedCountDescriptor)
    +
    +    val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
    +       new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
    +    smallestTsState = getRuntimeContext.getState(smallestTimestampDescriptor)
    +    
    +    val distinctValDescriptor : MapStateDescriptor[Any, Row] =
    +      new MapStateDescriptor[Any, Row]("distinctValuesBufferMapState", classOf[Any], classOf[Row])
    +    distinctValueState = getRuntimeContext.getMapState(distinctValDescriptor)
    +  }
    +
    +  override def processElement(
    +    input: Row,
    +    ctx: ProcessFunction[Row, Row]#Context,
    +    out: Collector[Row]): Unit = {
    +
    +    val currentTime = ctx.timerService.currentProcessingTime
    +    var i = 0
    +
    +    // initialize state for the processed element
    +    var accumulators = accumulatorState.value
    +    if (accumulators == null) {
    +      accumulators = new Row(aggregates.length)
    +      while (i < aggregates.length) {
    +        accumulators.setField(i, aggregates(i).createAccumulator())
    +        i += 1
    +      }
    +    }
    +
    +    // get smallest timestamp
    +    var smallestTs = smallestTsState.value
    +    if (smallestTs == 0L) {
    +      smallestTs = currentTime
    +      smallestTsState.update(smallestTs)
    +    }
    +    // get previous counter value
    +    var counter = counterState.value
    +
    +    if (counter == precedingOffset) {
    +      val retractList = rowMapState.get(smallestTs)
    +
    +      // get oldest element beyond buffer size
    +      // and if oldest element exist, retract value
    +      var removeCounter :Integer = 0
    +      var distinctCounter : Integer = 0
    +      var retractVal : Object = null
    +      i = 0
    +      while (i < aggregates.length) {
    +        val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
    +        retractVal = retractList.get(0).getField(aggFields(i)(0))
    +        if(distinctAggsFlag(i)){
    +          distinctCounter += 1
    +          val counterRow = distinctValueState.get(retractVal)
    --- End diff --
    
    the test you suggested indeed failed. great point! the new version passes the problem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3732#discussion_r112702313
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala ---
    @@ -0,0 +1,230 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.util
    +
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{Collector, Preconditions}
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +import org.apache.flink.api.common.state.MapState
    +import org.apache.flink.api.common.state.MapStateDescriptor
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ListTypeInfo
    +import java.util.{List => JList}
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    +import org.apache.flink.api.common.state.ListState
    +
    +class ProcTimeBoundedDistinctRowsOver(
    +  private val aggregates: Array[AggregateFunction[_]],
    +  private val aggFields: Array[Array[Int]],
    +  private val distinctAggsFlag: Array[Boolean],
    +  private val precedingOffset: Long,
    +  private val forwardedFieldCount: Int,
    +  private val aggregatesTypeInfo: RowTypeInfo,
    +  private val inputType: TypeInformation[Row])
    +    extends ProcessFunction[Row, Row] {
    +
    +  Preconditions.checkNotNull(aggregates)
    +  Preconditions.checkNotNull(aggFields)
    +  Preconditions.checkNotNull(distinctAggsFlag)
    +  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
    +  Preconditions.checkArgument(aggregates.length == aggFields.length)
    +  Preconditions.checkArgument(precedingOffset > 0)
    +
    +  private var accumulatorState: ValueState[Row] = _
    +  private var rowMapState: MapState[Long, JList[Row]] = _
    +  private var output: Row = _
    +  private var counterState: ValueState[Long] = _
    +  private var smallestTsState: ValueState[Long] = _
    +  private var distinctValueStateList: Array[MapState[Any, Long]] = _
    +
    +  override def open(config: Configuration) {
    +
    +    output = new Row(forwardedFieldCount + aggregates.length)
    +    // We keep the elements received in a Map state keyed
    +    // by the ingestion time in the operator.
    +    // we also keep counter of processed elements
    +    // and timestamp of oldest element
    +    val rowListTypeInfo: TypeInformation[JList[Row]] =
    +      new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
    +
    +    val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
    +      new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
    +        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo)
    +    rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
    +
    +    val aggregationStateDescriptor: ValueStateDescriptor[Row] =
    +      new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
    +    accumulatorState = getRuntimeContext.getState(aggregationStateDescriptor)
    +
    +    val processedCountDescriptor : ValueStateDescriptor[Long] =
    +       new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
    +    counterState = getRuntimeContext.getState(processedCountDescriptor)
    +
    +    val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
    +       new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
    +    smallestTsState = getRuntimeContext.getState(smallestTimestampDescriptor)
    +    distinctValueStateList = new Array(aggregates.size)
    +    for(i <- 0 until aggregates.size){
    +      if(distinctAggsFlag(i)){
    +        val distinctValDescriptor =  new MapStateDescriptor[Any, Long](
    +                                             "distinctValuesBufferMapState" + i,
    +                                              classOf[Any],
    +                                              classOf[Long])
    +        distinctValueStateList(i)=getRuntimeContext.getMapState(distinctValDescriptor)
    +      }
    +    }
    +  }
    +
    +  override def processElement(
    +    input: Row,
    +    ctx: ProcessFunction[Row, Row]#Context,
    +    out: Collector[Row]): Unit = {
    +
    +    val currentTime = ctx.timerService.currentProcessingTime
    +    var i = 0
    +
    +    // initialize state for the processed element
    +    var accumulators = accumulatorState.value
    +    if (accumulators == null) {
    +      accumulators = new Row(aggregates.length)
    +      while (i < aggregates.length) {
    +        accumulators.setField(i, aggregates(i).createAccumulator())
    +        i += 1
    +      }
    +    }
    +
    +    // get smallest timestamp
    +    var smallestTs = smallestTsState.value
    +    if (smallestTs == 0L) {
    +      smallestTs = currentTime
    +      smallestTsState.update(smallestTs)
    +    }
    +    // get previous counter value
    +    var counter = counterState.value
    +
    +    if (counter == precedingOffset) {
    +      val retractList = rowMapState.get(smallestTs)
    +
    +      // get oldest element beyond buffer size
    +      // and if oldest element exist, retract value
    +      var removeCounter :Integer = 0
    +      var distinctCounter : Integer = 0
    +      var retractVal : Object = null
    +      i = 0
    +      while (i < aggregates.length) {
    +        val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
    +        retractVal = retractList.get(0).getField(aggFields(i)(0))
    +        if(distinctAggsFlag(i)){
    +          var distinctValCounter: Long = distinctValueStateList(i).get(retractVal)
    +          // if the value to be retract is the last one added
    +          // the remove it and retract the value
    +          if(distinctValCounter == 1L){
    +            aggregates(i).retract(accumulator, retractVal)
    +            distinctValueStateList(i).remove(retractVal)
    +          } // if the are other values in the buffer 
    +            // decrease the counter and continue
    +          else {
    +            distinctValCounter -= 1
    +            distinctValueStateList(i).put(retractVal,distinctValCounter)
    --- End diff --
    
    +space


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3732#discussion_r112716483
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala ---
    @@ -0,0 +1,230 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.util
    +
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{Collector, Preconditions}
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +import org.apache.flink.api.common.state.MapState
    +import org.apache.flink.api.common.state.MapStateDescriptor
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ListTypeInfo
    +import java.util.{List => JList}
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    +import org.apache.flink.api.common.state.ListState
    +
    +class ProcTimeBoundedDistinctRowsOver(
    +  private val aggregates: Array[AggregateFunction[_]],
    +  private val aggFields: Array[Array[Int]],
    +  private val distinctAggsFlag: Array[Boolean],
    +  private val precedingOffset: Long,
    +  private val forwardedFieldCount: Int,
    +  private val aggregatesTypeInfo: RowTypeInfo,
    +  private val inputType: TypeInformation[Row])
    +    extends ProcessFunction[Row, Row] {
    +
    +  Preconditions.checkNotNull(aggregates)
    +  Preconditions.checkNotNull(aggFields)
    +  Preconditions.checkNotNull(distinctAggsFlag)
    +  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
    +  Preconditions.checkArgument(aggregates.length == aggFields.length)
    +  Preconditions.checkArgument(precedingOffset > 0)
    +
    +  private var accumulatorState: ValueState[Row] = _
    +  private var rowMapState: MapState[Long, JList[Row]] = _
    +  private var output: Row = _
    +  private var counterState: ValueState[Long] = _
    +  private var smallestTsState: ValueState[Long] = _
    +  private var distinctValueStateList: Array[MapState[Any, Long]] = _
    +
    +  override def open(config: Configuration) {
    +
    +    output = new Row(forwardedFieldCount + aggregates.length)
    +    // We keep the elements received in a Map state keyed
    +    // by the ingestion time in the operator.
    +    // we also keep counter of processed elements
    +    // and timestamp of oldest element
    +    val rowListTypeInfo: TypeInformation[JList[Row]] =
    +      new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
    +
    +    val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
    +      new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
    +        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo)
    +    rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
    +
    +    val aggregationStateDescriptor: ValueStateDescriptor[Row] =
    +      new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
    +    accumulatorState = getRuntimeContext.getState(aggregationStateDescriptor)
    +
    +    val processedCountDescriptor : ValueStateDescriptor[Long] =
    +       new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
    +    counterState = getRuntimeContext.getState(processedCountDescriptor)
    +
    +    val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
    +       new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
    +    smallestTsState = getRuntimeContext.getState(smallestTimestampDescriptor)
    +    distinctValueStateList = new Array(aggregates.size)
    +    for(i <- 0 until aggregates.size){
    +      if(distinctAggsFlag(i)){
    +        val distinctValDescriptor =  new MapStateDescriptor[Any, Long](
    +                                             "distinctValuesBufferMapState" + i,
    +                                              classOf[Any],
    +                                              classOf[Long])
    +        distinctValueStateList(i)=getRuntimeContext.getMapState(distinctValDescriptor)
    +      }
    +    }
    +  }
    +
    +  override def processElement(
    +    input: Row,
    +    ctx: ProcessFunction[Row, Row]#Context,
    +    out: Collector[Row]): Unit = {
    +
    +    val currentTime = ctx.timerService.currentProcessingTime
    +    var i = 0
    +
    +    // initialize state for the processed element
    +    var accumulators = accumulatorState.value
    +    if (accumulators == null) {
    +      accumulators = new Row(aggregates.length)
    +      while (i < aggregates.length) {
    +        accumulators.setField(i, aggregates(i).createAccumulator())
    +        i += 1
    +      }
    +    }
    +
    +    // get smallest timestamp
    +    var smallestTs = smallestTsState.value
    +    if (smallestTs == 0L) {
    +      smallestTs = currentTime
    +      smallestTsState.update(smallestTs)
    +    }
    +    // get previous counter value
    +    var counter = counterState.value
    +
    +    if (counter == precedingOffset) {
    +      val retractList = rowMapState.get(smallestTs)
    +
    +      // get oldest element beyond buffer size
    +      // and if oldest element exist, retract value
    +      var removeCounter :Integer = 0
    +      var distinctCounter : Integer = 0
    +      var retractVal : Object = null
    +      i = 0
    +      while (i < aggregates.length) {
    +        val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
    +        retractVal = retractList.get(0).getField(aggFields(i)(0))
    +        if(distinctAggsFlag(i)){
    +          var distinctValCounter: Long = distinctValueStateList(i).get(retractVal)
    +          // if the value to be retract is the last one added
    +          // the remove it and retract the value
    +          if(distinctValCounter == 1L){
    +            aggregates(i).retract(accumulator, retractVal)
    +            distinctValueStateList(i).remove(retractVal)
    +          } // if the are other values in the buffer 
    +            // decrease the counter and continue
    +          else {
    +            distinctValCounter -= 1
    +            distinctValueStateList(i).put(retractVal,distinctValCounter)
    +          }
    +        }else {
    +          aggregates(i).retract(accumulator, retractVal)
    +        }
    +        i += 1
    +      }
    +      retractList.remove(0)
    +      // if reference timestamp list not empty, keep the list
    +      if (!retractList.isEmpty) {
    +        rowMapState.put(smallestTs, retractList)
    +      } // if smallest timestamp list is empty, remove and find new smallest
    +      else {
    +        rowMapState.remove(smallestTs)
    +        val iter = rowMapState.keys.iterator
    +        var currentTs: Long = 0L
    +        var newSmallestTs: Long = Long.MaxValue
    +        while (iter.hasNext) {
    +          currentTs = iter.next
    +          if (currentTs < newSmallestTs) {
    +            newSmallestTs = currentTs
    +          }
    +        }
    +        smallestTsState.update(newSmallestTs)
    +      }
    +    } // we update the counter only while buffer is getting filled
    +    else {
    +      counter += 1
    +      counterState.update(counter)
    +    }
    +
    +    // copy forwarded fields in output row
    +    i = 0
    +    while (i < forwardedFieldCount) {
    +      output.setField(i, input.getField(i))
    +      i += 1
    +    }
    +
    +    // accumulate current row and set aggregate in output row
    +    i = 0
    +    while (i < aggregates.length) {
    +      val index = forwardedFieldCount + i
    +      val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
    +      val inputValue = input.getField(aggFields(i)(0))
    +      // check if distinct aggregation
    +      if(distinctAggsFlag(i)){
    +        // if first time we see value, set counter and aggregate
    +          var distinctValCounter: Long = distinctValueStateList(i).get(inputValue)
    +          // if counter is 0L first time we aggregate
    +          // for a seen value but never accumulated
    +          if(distinctValCounter == 0L){
    --- End diff --
    
    OK, good to know :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by huawei-flink <gi...@git.apache.org>.
Github user huawei-flink commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3732#discussion_r112151222
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala ---
    @@ -0,0 +1,238 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.util
    +
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{Collector, Preconditions}
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +import org.apache.flink.api.common.state.MapState
    +import org.apache.flink.api.common.state.MapStateDescriptor
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ListTypeInfo
    +import java.util.{List => JList}
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    +
    +class ProcTimeBoundedDistinctRowsOver(
    +  private val aggregates: Array[AggregateFunction[_]],
    +  private val aggFields: Array[Array[Int]],
    +  private val distinctAggsFlag: Array[Boolean],
    +  private val precedingOffset: Long,
    +  private val forwardedFieldCount: Int,
    +  private val aggregatesTypeInfo: RowTypeInfo,
    +  private val inputType: TypeInformation[Row])
    +    extends ProcessFunction[Row, Row] {
    +
    +  Preconditions.checkNotNull(aggregates)
    +  Preconditions.checkNotNull(aggFields)
    +  Preconditions.checkNotNull(distinctAggsFlag)
    +  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
    +  Preconditions.checkArgument(aggregates.length == aggFields.length)
    +  Preconditions.checkArgument(precedingOffset > 0)
    +
    +  private var accumulatorState: ValueState[Row] = _
    +  private var rowMapState: MapState[Long, JList[Row]] = _
    +  private var output: Row = _
    +  private var counterState: ValueState[Long] = _
    +  private var smallestTsState: ValueState[Long] = _
    +  private var distinctValueState: MapState[Any, Row] = _
    +
    +  override def open(config: Configuration) {
    +
    +    output = new Row(forwardedFieldCount + aggregates.length)
    +    // We keep the elements received in a Map state keyed
    +    // by the ingestion time in the operator.
    +    // we also keep counter of processed elements
    +    // and timestamp of oldest element
    +    val rowListTypeInfo: TypeInformation[JList[Row]] =
    +      new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
    +
    +    val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
    +      new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
    +        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo)
    +    rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
    +
    +    val aggregationStateDescriptor: ValueStateDescriptor[Row] =
    +      new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
    +    accumulatorState = getRuntimeContext.getState(aggregationStateDescriptor)
    +
    +    val processedCountDescriptor : ValueStateDescriptor[Long] =
    +       new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
    +    counterState = getRuntimeContext.getState(processedCountDescriptor)
    +
    +    val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
    +       new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
    +    smallestTsState = getRuntimeContext.getState(smallestTimestampDescriptor)
    +    
    +    val distinctValDescriptor : MapStateDescriptor[Any, Row] =
    +      new MapStateDescriptor[Any, Row]("distinctValuesBufferMapState", classOf[Any], classOf[Row])
    +    distinctValueState = getRuntimeContext.getMapState(distinctValDescriptor)
    +  }
    +
    +  override def processElement(
    +    input: Row,
    +    ctx: ProcessFunction[Row, Row]#Context,
    +    out: Collector[Row]): Unit = {
    +
    +    val currentTime = ctx.timerService.currentProcessingTime
    +    var i = 0
    +
    +    // initialize state for the processed element
    +    var accumulators = accumulatorState.value
    +    if (accumulators == null) {
    +      accumulators = new Row(aggregates.length)
    +      while (i < aggregates.length) {
    +        accumulators.setField(i, aggregates(i).createAccumulator())
    +        i += 1
    +      }
    +    }
    +
    +    // get smallest timestamp
    +    var smallestTs = smallestTsState.value
    +    if (smallestTs == 0L) {
    +      smallestTs = currentTime
    +      smallestTsState.update(smallestTs)
    +    }
    +    // get previous counter value
    +    var counter = counterState.value
    +
    +    if (counter == precedingOffset) {
    +      val retractList = rowMapState.get(smallestTs)
    +
    +      // get oldest element beyond buffer size
    +      // and if oldest element exist, retract value
    +      var removeCounter :Integer = 0
    +      var distinctCounter : Integer = 0
    +      var retractVal : Object = null
    +      i = 0
    +      while (i < aggregates.length) {
    +        val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
    +        retractVal = retractList.get(0).getField(aggFields(i)(0))
    --- End diff --
    
    If I understand your question correctly, the list is necessary to include co-occurring events (i.e. function processed in the same MS).  However this part was included in the original code already merged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3732: [FLINK-6250] Distinct procTime with Rows boundaries

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3732
  
    Since PR #3771 is the follow up of this PR, could you close this one? Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---