You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by huawei-flink <gi...@git.apache.org> on 2017/04/26 17:24:57 UTC

[GitHub] flink pull request #3783: [FLINK-6338] Add support for DISTINCT into Code Ge...

GitHub user huawei-flink opened a pull request:

    https://github.com/apache/flink/pull/3783

    [FLINK-6338] Add support for DISTINCT into Code Generated Aggregations

    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [ ] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [ ] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [ ] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/stefanobortoli/flink FLINK-6338

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3783.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3783
    
----
commit 55e0a8d38187bef22b6135db7f4a5c1cc8f15811
Author: Stefano Bortoli <s....@gmail.com>
Date:   2017-04-26T17:22:04Z

    Added code generation distinct aggregation logic

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3783: [FLINK-6388] Add support for DISTINCT into Code Ge...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3783#discussion_r124030194
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -296,6 +299,39 @@ class CodeGenerator(
           fields.mkString(", ")
         }
     
    +    def genInitialize(): String = {
    +      
    +      val sig: String = 
    +        j"""
    +           |  public void initialize(
    +           |    org.apache.flink.api.common.functions.RuntimeContext ctx
    +           |  )""".stripMargin
    +        
    +      val initDist: String = if( distinctAggsFlags.isDefined ) {
    +        val statePackage = "org.apache.flink.api.common.state"
    +        val distAggsFlags = distinctAggsFlags.get
    +          for(i <- distAggsFlags.indices) yield
    +            if(distAggsFlags(i)) {
    +              val typeString = javaTypes(aggFields(i)(0))
    --- End diff --
    
    UDAGGs can have more than a single parameter.
    
    Since the key can only be a single object, we have to put all arguments into a `TupleX` (I think the limitation of 25 fields is reasonable and we can throw an exception if we observe a DISTINCT UDAGG with more than 25 fields).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3783: [FLINK-6388] Add support for DISTINCT into Code Generated...

Posted by stefanobortoli <gi...@git.apache.org>.
Github user stefanobortoli commented on the issue:

    https://github.com/apache/flink/pull/3783
  
    @fhueske thanks for the comments. Did we include the latest calcite already?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3783: [FLINK-6388] Add support for DISTINCT into Code Ge...

Posted by stefanobortoli <gi...@git.apache.org>.
Github user stefanobortoli commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3783#discussion_r125291702
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -296,6 +299,39 @@ class CodeGenerator(
           fields.mkString(", ")
         }
     
    +    def genInitialize(): String = {
    +      
    +      val sig: String = 
    +        j"""
    +           |  public void initialize(
    +           |    org.apache.flink.api.common.functions.RuntimeContext ctx
    +           |  )""".stripMargin
    +        
    +      val initDist: String = if( distinctAggsFlags.isDefined ) {
    +        val statePackage = "org.apache.flink.api.common.state"
    +        val distAggsFlags = distinctAggsFlags.get
    +          for(i <- distAggsFlags.indices) yield
    +            if(distAggsFlags(i)) {
    +              val typeString = javaTypes(aggFields(i)(0))
    --- End diff --
    
    sounds good


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3783: [FLINK-6388] Add support for DISTINCT into Code Generated...

Posted by rtudoran <gi...@git.apache.org>.
Github user rtudoran commented on the issue:

    https://github.com/apache/flink/pull/3783
  
    @fhueske thanks for the clarification.  I think it is good also to have the solution for the over windows :)
    
    I also wanted to ask you about the calcite and DISTINCT/DIST syntax. What do you think should be the right plan to proceed? Do we push it with DIST and sync with Calcite community of when they will have the next release and than create a pull request to upgrade the calcite version used?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3783: [FLINK-6388] Add support for DISTINCT into Code Ge...

Posted by stefanobortoli <gi...@git.apache.org>.
Github user stefanobortoli commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3783#discussion_r125291954
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala ---
    @@ -20,16 +20,22 @@ package org.apache.flink.table.runtime.aggregate
     
     import org.apache.flink.api.common.functions.Function
     import org.apache.flink.types.Row
    +import org.apache.flink.api.common.functions.RuntimeContext
     
     /**
       * Base class for code-generated aggregations.
       */
     abstract class GeneratedAggregations extends Function {
    +  
    +  /**
    +    * Initialize the state for the distinct aggregation check
    +    *
    +    * @param ctx the runtime context to retrieve and initialize the distinct states
    +    */
    +  def initialize(ctx: RuntimeContext)
     
       /**
    -    * Sets the results of the aggregations (partial or final) to the output row.
    --- End diff --
    
    I think it was some formatting issue


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3783: [FLINK-6388] Add support for DISTINCT into Code Ge...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3783#discussion_r126536553
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -296,6 +299,39 @@ class CodeGenerator(
           fields.mkString(", ")
         }
     
    +    def genInitialize(): String = {
    +      
    +      val sig: String = 
    +        j"""
    +           |  public void initialize(
    +           |    org.apache.flink.api.common.functions.RuntimeContext ctx
    +           |  )""".stripMargin
    +        
    +      val initDist: String = if( distinctAggsFlags.isDefined ) {
    +        val statePackage = "org.apache.flink.api.common.state"
    +        val distAggsFlags = distinctAggsFlags.get
    +          for(i <- distAggsFlags.indices) yield
    +            if(distAggsFlags(i)) {
    +              val typeString = javaTypes(aggFields(i)(0))
    --- End diff --
    
    That's actually a very good point! We have to use `Row` because `Tuple` doesn't support `null` values. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3783: [FLINK-6388] Add support for DISTINCT into Code Generated...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3783
  
    Hi @huawei-flink, I did not say that we need to move the state out of the code-gen'd function. We can and should leave the PR as it is. 
    
    However, we cannot use this code for distinct group window aggregates but only for distinct over aggregates once they are supported in the API.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3783: [FLINK-6388] Add support for DISTINCT into Code Generated...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3783
  
    Thanks for this PR @huawei-flink! 
    
    I think I made a mistake when I suggested to use the code-gen'd functions with registered `MapState` to compute distinct window aggregations. Originally, I thought it would be possible to register state (i.e., the `MapState` for the distinct values) in an `AggregateFunction` (which is used for the grouped window aggregates). However, that's unfortunately not possible as I learned today. All state of an `AggregateFunction` must be contained in the accumulator.
    
    What does this mean? We cannot use the current approach of registering `MapState` in the code-gen'd function for group windowed aggregates. So we would need another approach for that.
    
    However, we can still use your code for distinct over windows (`ProcessFunction` can obviously register state) once the API supports to define DISTINCT aggregates.
    
    I'll try to have a closer look at this PR soon.
    
    Best, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3783: [FLINK-6388] Add support for DISTINCT into Code Generated...

Posted by huawei-flink <gi...@git.apache.org>.
Github user huawei-flink commented on the issue:

    https://github.com/apache/flink/pull/3783
  
    @rtudoran @fhueske the first implementation I made was with the state in the ProcessFunction without code generated aggregation function. Second, I pushed a branch with the state in the process function using the code generated process function. Then, third I moved the state within the code generated function. 
    
    It is not clear to me why the state cannot be within the code generated function. Could you please clarify so that we can understand whether it is worth working around it. This feature is quite important for us.
    
    Anyway, you could have a look at the branch that uses the state in the process function and uses the code generated aggregation functions. Basically, rather than generate one code generated function for all the aggregations, I create one class for each, and then I call the corresponding accumulate/retract using the distinct logic when marked in the process function. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3783: [FLINK-6388] Add support for DISTINCT into Code Ge...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3783#discussion_r124033513
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -335,15 +371,30 @@ class CodeGenerator(
             j"""
                 |  public final void accumulate(
                 |    org.apache.flink.types.Row accs,
    -            |    org.apache.flink.types.Row input)""".stripMargin
    +            |    org.apache.flink.types.Row input) throws Exception""".stripMargin
     
    +      val distAggsFlags: Array[Boolean] = distinctAggsFlags.getOrElse(new Array[Boolean](0))    
           val accumulate: String = {
             for (i <- aggs.indices) yield
    -          j"""
    -             |    ${aggs(i)}.accumulate(
    -             |      ((${accTypes(i)}) accs.getField($i)),
    -             |      ${parameters(i)});""".stripMargin
    -      }.mkString("\n")
    +          if (distinctAggsFlags.isDefined && distAggsFlags(i)){
    +            j"""
    +               |  Long distValCount$i = (Long) distStateList[$i].get(${parameters(i)});
    --- End diff --
    
    We need to put the parameters into a tuple before accessing the map state. The tuple should be reused.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3783: [FLINK-6388] Add support for DISTINCT into Code Ge...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3783#discussion_r124037152
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -533,7 +600,25 @@ class CodeGenerator(
              |$reset
              |  }""".stripMargin
         }
    -
    +    
    +    var existDistinct = false
    +    if (distinctAggsFlags.isDefined){
    +      val distAggsFlags = distinctAggsFlags.get
    +      for(i <- distAggsFlags.indices){
    +        if(distAggsFlags(i)){ existDistinct = true }
    +      }
    +    }
    +    if(existDistinct){
    +     val initReusMember = {
    --- End diff --
    
    The code for the distinct deduplication and tuple reusage for multi-argument aggregate function could go here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3783: [FLINK-6388] Add support for DISTINCT into Code Generated...

Posted by rtudoran <gi...@git.apache.org>.
Github user rtudoran commented on the issue:

    https://github.com/apache/flink/pull/3783
  
    @fhueske @stefanobortoli 
    I recently fixed in Calcite the problem of porting distinct flag to the the over. This was merged in the master. Hence it is a matter of when flink will get the new calcite version. We can also consider the temporary solution IMHO until then


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3783: [FLINK-6388] Add support for DISTINCT into Code Generated...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3783
  
    Not yet, but AFAIK is @twalthr working on that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3783: [FLINK-6388] Add support for DISTINCT into Code Generated...

Posted by huawei-flink <gi...@git.apache.org>.
Github user huawei-flink commented on the issue:

    https://github.com/apache/flink/pull/3783
  
    @fhueske , this is the PR with the code generated distinct aggregation for OVER. You mentioned that the value of the aggregation should be a Row, but what is kept in the distinct state is just the event value, not its "aggregation value state". Perhaps you can try to explain it better to me so that I can complete this PR and we can move on. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3783: [FLINK-6388] Add support for DISTINCT into Code Generated...

Posted by rtudoran <gi...@git.apache.org>.
Github user rtudoran commented on the issue:

    https://github.com/apache/flink/pull/3783
  
    @fhueske @stefanobortoli 
    Regarding the options for solving the distinct. From my point of view the previous approach worked:
    - we keep in the processFunctions a state for each field that is used in a distinct aggregator
    - we count the occurrence of each value (meant for a distinct aggregate) that we observed 
    - when a value is seen for the first time we accumulate it
    - when a value is retracted we decrease the corresponding count. 
    -if count is 0 we retract the value from accumulator
    
    Based on how things are implemented now - this would involved to have a separate list of aggregatefunctions for the distinct. In order to be able to control when to accumulate to these values.
    What do you think? Do you see any disadvantage to this? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3783: [FLINK-6388] Add support for DISTINCT into Code Ge...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3783#discussion_r124037411
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala ---
    @@ -20,16 +20,22 @@ package org.apache.flink.table.runtime.aggregate
     
     import org.apache.flink.api.common.functions.Function
     import org.apache.flink.types.Row
    +import org.apache.flink.api.common.functions.RuntimeContext
     
     /**
       * Base class for code-generated aggregations.
       */
     abstract class GeneratedAggregations extends Function {
    +  
    +  /**
    +    * Initialize the state for the distinct aggregation check
    +    *
    +    * @param ctx the runtime context to retrieve and initialize the distinct states
    +    */
    +  def initialize(ctx: RuntimeContext)
     
       /**
    -    * Sets the results of the aggregations (partial or final) to the output row.
    --- End diff --
    
    Why would you like to change this comment?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3783: [FLINK-6388] Add support for DISTINCT into Code Generated...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3783
  
    Hi @huawei-flink ,
    
    I'll close this PR later today when merging #5555.
    #5555 adds runtime support for distinct aggregation and follows a similar approach as this PR. However, it leverages the `MapView` feature and is therefore a bit more generic. 
    Thanks for working on this. This PR led us in the right direction.
    
    Best, Fabian


---

[GitHub] flink pull request #3783: [FLINK-6388] Add support for DISTINCT into Code Ge...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3783#discussion_r124029025
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -296,6 +299,39 @@ class CodeGenerator(
           fields.mkString(", ")
         }
     
    +    def genInitialize(): String = {
    +      
    +      val sig: String = 
    +        j"""
    +           |  public void initialize(
    +           |    org.apache.flink.api.common.functions.RuntimeContext ctx
    +           |  )""".stripMargin
    +        
    +      val initDist: String = if( distinctAggsFlags.isDefined ) {
    +        val statePackage = "org.apache.flink.api.common.state"
    +        val distAggsFlags = distinctAggsFlags.get
    +          for(i <- distAggsFlags.indices) yield
    --- End diff --
    
    It would be good to share distinct state across aggregations if they are on the same fields, i.e., make distinct state distinct ;-). I would do this in a preprocessing step in the `generateAggregations` method.
    
    It would also change the way how we access the state, because we may increment / decrement a key only once per record (call of `accumulate` if state is shared.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3783: [FLINK-6388] Add support for DISTINCT into Code Ge...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/3783


---

[GitHub] flink pull request #3783: [FLINK-6388] Add support for DISTINCT into Code Ge...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3783#discussion_r124036796
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -533,7 +600,25 @@ class CodeGenerator(
              |$reset
              |  }""".stripMargin
         }
    -
    +    
    +    var existDistinct = false
    +    if (distinctAggsFlags.isDefined){
    +      val distAggsFlags = distinctAggsFlags.get
    +      for(i <- distAggsFlags.indices){
    +        if(distAggsFlags(i)){ existDistinct = true }
    +      }
    +    }
    +    if(existDistinct){
    +     val initReusMember = {
    --- End diff --
    
    `initReusMember` -> `initReuseMember`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3783: [FLINK-6388] Add support for DISTINCT into Code Ge...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3783#discussion_r124028061
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -296,6 +299,39 @@ class CodeGenerator(
           fields.mkString(", ")
         }
     
    +    def genInitialize(): String = {
    +      
    +      val sig: String = 
    +        j"""
    +           |  public void initialize(
    +           |    org.apache.flink.api.common.functions.RuntimeContext ctx
    +           |  )""".stripMargin
    +        
    +      val initDist: String = if( distinctAggsFlags.isDefined ) {
    +        val statePackage = "org.apache.flink.api.common.state"
    +        val distAggsFlags = distinctAggsFlags.get
    +          for(i <- distAggsFlags.indices) yield
    --- End diff --
    
    please add a space after `for`, `if`, `while`, etc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3783: [FLINK-6338] Add support for DISTINCT into Code Generated...

Posted by huawei-flink <gi...@git.apache.org>.
Github user huawei-flink commented on the issue:

    https://github.com/apache/flink/pull/3783
  
    @fhueske please have a look at this PR, it contains just the code generation part with optional distinct.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3783: [FLINK-6338] Add support for DISTINCT into Code Generated...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/3783
  
    Please fix the PR title, you are referencing the wrong JIRA.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3783: [FLINK-6388] Add support for DISTINCT into Code Ge...

Posted by stefanobortoli <gi...@git.apache.org>.
Github user stefanobortoli commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3783#discussion_r125677038
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -296,6 +299,39 @@ class CodeGenerator(
           fields.mkString(", ")
         }
     
    +    def genInitialize(): String = {
    +      
    +      val sig: String = 
    +        j"""
    +           |  public void initialize(
    +           |    org.apache.flink.api.common.functions.RuntimeContext ctx
    +           |  )""".stripMargin
    +        
    +      val initDist: String = if( distinctAggsFlags.isDefined ) {
    +        val statePackage = "org.apache.flink.api.common.state"
    +        val distAggsFlags = distinctAggsFlags.get
    +          for(i <- distAggsFlags.indices) yield
    +            if(distAggsFlags(i)) {
    +              val typeString = javaTypes(aggFields(i)(0))
    --- End diff --
    
    actually, why shouldn't we use directly rows? is there any specific reason to prefer tuple?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---