You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by stefanobortoli <gi...@git.apache.org> on 2017/04/25 16:56:39 UTC

[GitHub] flink pull request #3771: [FLINK-6250] Distinct procTime with Rows boundarie...

GitHub user stefanobortoli opened a pull request:

    https://github.com/apache/flink/pull/3771

    [FLINK-6250] Distinct procTime with Rows boundaries

    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [X ] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [ X] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [ X] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/stefanobortoli/flink FLINK-6250b

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3771.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3771
    
----

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3771: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3771#discussion_r113483437
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -335,14 +371,28 @@ class CodeGenerator(
             j"""
                 |  public final void accumulate(
                 |    org.apache.flink.types.Row accs,
    -            |    org.apache.flink.types.Row input)""".stripMargin
    +            |    org.apache.flink.types.Row input) throws Exception""".stripMargin
     
           val accumulate: String = {
             for (i <- aggs.indices) yield
    -          j"""
    +         if(distinctAggsFlags(i)){
    +           j"""
    +              |  Long distValCount$i = (Long) distStateList[$i].get(${parameters(i)});
    +              |  if( distValCount$i == null){
    --- End diff --
    
    Ah, that makes sense. Scala `Long` is backed by Java `long`, i.e., it does not support `null`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3771: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by stefanobortoli <gi...@git.apache.org>.
Github user stefanobortoli closed the pull request at:

    https://github.com/apache/flink/pull/3771


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3771: [FLINK-6250] Distinct procTime with Rows boundaries

Posted by huawei-flink <gi...@git.apache.org>.
Github user huawei-flink commented on the issue:

    https://github.com/apache/flink/pull/3771
  
    @fhueske @haohui I have no problem removing the DIST() part, it is just not possible to test it without. Shall I push just the code generation and aggregates util changes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3771: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3771#discussion_r113446347
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -290,6 +295,10 @@ object AggregateUtil {
         val aggMapping = aggregates.indices.toArray.map(_ + groupings.length)
         val outputArity = aggregates.length + groupings.length + 1
     
    +    // remove when distinct is supported
    +    val distinctAggregatesFlags = new Array[Boolean](aggregates.size)
    --- End diff --
    
    The same applies for all DataSet aggregations.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3771: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3771#discussion_r113446943
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala ---
    @@ -88,6 +88,12 @@ class BoundedProcessingOverRangeProcessFunctionTest {
             |    "mluZyRMb25nJOda0iCPo2ukAgAAeHA");
             |  }
             |
    +        |  public void initialize(
    --- End diff --
    
    can you make this a one-line change?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3771: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3771#discussion_r113442315
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -296,6 +297,41 @@ class CodeGenerator(
           fields.mkString(", ")
         }
     
    +def genInitialize(existDistinct : Boolean): String = {
    +      
    +      val sig: String = 
    +        j"""
    +           | org.apache.flink.api.common.state.MapState[] distStateList =
    --- End diff --
    
    Reusable fields should be added with `reusableMemberStatements.add()`.
    (Same should be done for the Iterables in `genMergeList()`)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3771: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by huawei-flink <gi...@git.apache.org>.
Github user huawei-flink commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3771#discussion_r113471045
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -296,6 +297,41 @@ class CodeGenerator(
           fields.mkString(", ")
         }
     
    +def genInitialize(existDistinct : Boolean): String = {
    +      
    +      val sig: String = 
    +        j"""
    +           | org.apache.flink.api.common.state.MapState[] distStateList =
    +           |   new org.apache.flink.api.common.state.MapState[ ${distinctAggsFlags.size} ];
    +           | 
    +           |  public void initialize(
    +           |    org.apache.flink.api.common.functions.RuntimeContext ctx
    +           |  )""".stripMargin
    +      if(existDistinct){
    --- End diff --
    
    you are right


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3771: [FLINK-6250] Distinct procTime with Rows boundaries

Posted by huawei-flink <gi...@git.apache.org>.
Github user huawei-flink commented on the issue:

    https://github.com/apache/flink/pull/3771
  
    So, what do you want me to keep for this PR? just the code generation and its test?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3771: [FLINK-6250] Distinct procTime with Rows boundaries

Posted by huawei-flink <gi...@git.apache.org>.
Github user huawei-flink commented on the issue:

    https://github.com/apache/flink/pull/3771
  
    @fhueske I have created #3783 with just the code generation part. At least the GROUP BY distinct can move ahead. I will close this PR and wait for the merging of the Calcite fix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3771: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by huawei-flink <gi...@git.apache.org>.
Github user huawei-flink commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3771#discussion_r113472166
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala ---
    @@ -72,7 +73,15 @@ class ProcTimeBoundedRowsOver(
           genAggregations.code)
         LOG.debug("Instantiating AggregateHelper.")
         function = clazz.newInstance()
    -
    +    
    +    var initialized = false
    +    for(i <- distinctAggFlags.indices){
    +      if(distinctAggFlags(i) && !initialized){
    +        function.initialize(getRuntimeContext())
    --- End diff --
    
    right!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3771: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3771#discussion_r113442825
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -335,14 +371,28 @@ class CodeGenerator(
             j"""
                 |  public final void accumulate(
                 |    org.apache.flink.types.Row accs,
    -            |    org.apache.flink.types.Row input)""".stripMargin
    +            |    org.apache.flink.types.Row input) throws Exception""".stripMargin
     
           val accumulate: String = {
             for (i <- aggs.indices) yield
    -          j"""
    +         if(distinctAggsFlags(i)){
    +           j"""
    +              |  Long distValCount$i = (Long) distStateList[$i].get(${parameters(i)});
    +              |  if( distValCount$i == null){
    +              |    ${aggs(i)}.accumulate(
    +              |      ((${accTypes(i)}) accs.getField($i)),
    +              |      ${parameters(i)});
    +              |    distValCount$i = 0L;
    +              |  }
    +              |  distValCount$i += 1;
    +              |  distStateList[$i].put(${parameters(i)}, distValCount$i);
    +           """.stripMargin
    +         }else {
    --- End diff --
    
    +space `} else {`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3771: [FLINK-6250] Distinct procTime with Rows boundaries

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3771
  
    If we merge this change together with #3764, we would check it with tests that use distinct grouped window aggregates. 
    But you are right, it might in fact make sense to test the `GeneratedAggregations` class generated by the `CodeGenerator` individually in a unit test. I wouldn't make this as part of this PR though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3771: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by huawei-flink <gi...@git.apache.org>.
Github user huawei-flink commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3771#discussion_r113471605
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -335,14 +371,28 @@ class CodeGenerator(
             j"""
                 |  public final void accumulate(
                 |    org.apache.flink.types.Row accs,
    -            |    org.apache.flink.types.Row input)""".stripMargin
    +            |    org.apache.flink.types.Row input) throws Exception""".stripMargin
     
           val accumulate: String = {
             for (i <- aggs.indices) yield
    -          j"""
    +         if(distinctAggsFlags(i)){
    +           j"""
    +              |  Long distValCount$i = (Long) distStateList[$i].get(${parameters(i)});
    +              |  if( distValCount$i == null){
    --- End diff --
    
    In scala it is 0L, is Java it is null. :-/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3771: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3771#discussion_r113443466
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -296,6 +297,41 @@ class CodeGenerator(
           fields.mkString(", ")
         }
     
    +def genInitialize(existDistinct : Boolean): String = {
    +      
    +      val sig: String = 
    +        j"""
    +           | org.apache.flink.api.common.state.MapState[] distStateList =
    +           |   new org.apache.flink.api.common.state.MapState[ ${distinctAggsFlags.size} ];
    +           | 
    +           |  public void initialize(
    +           |    org.apache.flink.api.common.functions.RuntimeContext ctx
    +           |  )""".stripMargin
    +      if(existDistinct){
    --- End diff --
    
    I think this check and the `existDistinct` can be removed. If all fields are `false`, we won't generate anything.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3771: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3771#discussion_r113446584
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala ---
    @@ -20,16 +20,22 @@ package org.apache.flink.table.runtime.aggregate
     
     import org.apache.flink.api.common.functions.Function
     import org.apache.flink.types.Row
    +import org.apache.flink.api.common.functions.RuntimeContext
     
     /**
       * Base class for code-generated aggregations.
       */
     abstract class GeneratedAggregations extends Function {
    +  
    +  /**
    +    * Initialize the state for the distinct aggregation check
    +    *
    +    * @param ctx the runtime context to retrieve and initialize the distinct states
    +    */
    +  def initialize(ctx: RuntimeContext)
     
       /**
    -    * Sets the results of the aggregations (partial or final) to the output row.
    --- End diff --
    
    Revert this change?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3771: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3771#discussion_r113442944
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -335,14 +371,28 @@ class CodeGenerator(
             j"""
                 |  public final void accumulate(
                 |    org.apache.flink.types.Row accs,
    -            |    org.apache.flink.types.Row input)""".stripMargin
    +            |    org.apache.flink.types.Row input) throws Exception""".stripMargin
     
           val accumulate: String = {
             for (i <- aggs.indices) yield
    -          j"""
    +         if(distinctAggsFlags(i)){
    --- End diff --
    
    +spaces `if (distinctAggsFlags(i)) {
    `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3771: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3771#discussion_r113446823
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala ---
    @@ -72,7 +73,15 @@ class ProcTimeBoundedRowsOver(
           genAggregations.code)
         LOG.debug("Instantiating AggregateHelper.")
         function = clazz.newInstance()
    -
    +    
    +    var initialized = false
    +    for(i <- distinctAggFlags.indices){
    +      if(distinctAggFlags(i) && !initialized){
    +        function.initialize(getRuntimeContext())
    --- End diff --
    
    I would always call `function.initialize()`. If there are no distinct aggregates, nothing will happen


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3771: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by huawei-flink <gi...@git.apache.org>.
Github user huawei-flink commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3771#discussion_r113472029
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala ---
    @@ -20,16 +20,22 @@ package org.apache.flink.table.runtime.aggregate
     
     import org.apache.flink.api.common.functions.Function
     import org.apache.flink.types.Row
    +import org.apache.flink.api.common.functions.RuntimeContext
     
     /**
       * Base class for code-generated aggregations.
       */
     abstract class GeneratedAggregations extends Function {
    +  
    +  /**
    +    * Initialize the state for the distinct aggregation check
    +    *
    +    * @param ctx the runtime context to retrieve and initialize the distinct states
    +    */
    +  def initialize(ctx: RuntimeContext)
     
       /**
    -    * Sets the results of the aggregations (partial or final) to the output row.
    --- End diff --
    
    probably an error in the merging. sorry about that


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3771: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3771#discussion_r113443027
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -296,6 +297,41 @@ class CodeGenerator(
           fields.mkString(", ")
         }
     
    +def genInitialize(existDistinct : Boolean): String = {
    +      
    +      val sig: String = 
    +        j"""
    +           | org.apache.flink.api.common.state.MapState[] distStateList =
    +           |   new org.apache.flink.api.common.state.MapState[ ${distinctAggsFlags.size} ];
    +           | 
    +           |  public void initialize(
    +           |    org.apache.flink.api.common.functions.RuntimeContext ctx
    +           |  )""".stripMargin
    +      if(existDistinct){
    +      val initDist: String = {
    +          for(i <- distinctAggsFlags.indices) yield
    +            if( distinctAggsFlags(i)) {
    +              j"""
    +                 | 
    +                 | org.apache.flink.api.common.state.MapStateDescriptor<Object, Long> distDesc$i =
    +                 |   new org.apache.flink.api.common.state.MapStateDescriptor<Object, Long>(
    +                 |     "distinctValuesBufferMapState" + $i,
    +                 |     Object.class, Long.class);
    +                 | distStateList[$i] = ctx.getMapState( distDesc$i );
    +              """.stripMargin
    +            } else {
    +              ""
    +            }
    +      }.mkString("\n")
    +      
    +      j"""$sig {
    +         |  $initDist
    +         |  }""".stripMargin
    +      }else {
    --- End diff --
    
    +space


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3771: [FLINK-6250] Distinct procTime with Rows boundaries

Posted by stefanobortoli <gi...@git.apache.org>.
Github user stefanobortoli commented on the issue:

    https://github.com/apache/flink/pull/3771
  
    @fhueske @rtudoran @shijinkui @sunjincheng121 I have create a new PR for distinct in the code generator. Please have a look and let me know. I have implemented and tested only for OverProcTimeRowBounded window, but if you like it I can quickly implement and test also the others. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3771: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3771#discussion_r113446166
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -290,6 +295,10 @@ object AggregateUtil {
         val aggMapping = aggregates.indices.toArray.map(_ + groupings.length)
         val outputArity = aggregates.length + groupings.length + 1
     
    +    // remove when distinct is supported
    +    val distinctAggregatesFlags = new Array[Boolean](aggregates.size)
    --- End diff --
    
    `DataSet` has it's own way to deal with distinct aggregates. They could also not initialize the state. So it is save to call `generateAggregations()` with an empty array: `Array[Boolean]()` or we make the parameter an `Option[Array[Boolean]]` and pass `None`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3771: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3771#discussion_r113442648
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -296,6 +297,41 @@ class CodeGenerator(
           fields.mkString(", ")
         }
     
    +def genInitialize(existDistinct : Boolean): String = {
    +      
    +      val sig: String = 
    +        j"""
    +           | org.apache.flink.api.common.state.MapState[] distStateList =
    +           |   new org.apache.flink.api.common.state.MapState[ ${distinctAggsFlags.size} ];
    +           | 
    +           |  public void initialize(
    +           |    org.apache.flink.api.common.functions.RuntimeContext ctx
    +           |  )""".stripMargin
    +      if(existDistinct){
    +      val initDist: String = {
    +          for(i <- distinctAggsFlags.indices) yield
    +            if( distinctAggsFlags(i)) {
    +              j"""
    +                 | 
    +                 | org.apache.flink.api.common.state.MapStateDescriptor<Object, Long> distDesc$i =
    --- End diff --
    
    We should use the right type of the field instead of `Object`. With `Object`, we will use the GenericTypeSerializer instead of Flink's type-specific serializers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3771: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3771#discussion_r113443910
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -335,14 +371,28 @@ class CodeGenerator(
             j"""
                 |  public final void accumulate(
                 |    org.apache.flink.types.Row accs,
    -            |    org.apache.flink.types.Row input)""".stripMargin
    +            |    org.apache.flink.types.Row input) throws Exception""".stripMargin
     
           val accumulate: String = {
             for (i <- aggs.indices) yield
    -          j"""
    +         if(distinctAggsFlags(i)){
    +           j"""
    +              |  Long distValCount$i = (Long) distStateList[$i].get(${parameters(i)});
    +              |  if( distValCount$i == null){
    --- End diff --
    
    Didn't you say that the return value is never `null` but `0` if not set?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3771: [FLINK-6250] Distinct procTime with Rows boundarie...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3771#discussion_r113446851
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala ---
    @@ -46,6 +46,7 @@ import org.slf4j.LoggerFactory
       */
     class ProcTimeBoundedRowsOver(
         genAggregations: GeneratedAggregationsFunction,
    +    distinctAggFlags: Array[Boolean],
    --- End diff --
    
    remove this parameter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---