You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by twalthr <gi...@git.apache.org> on 2017/09/27 15:31:50 UTC

[GitHub] flink pull request #4736: [FLINK-7371] [table] Add support for constant para...

GitHub user twalthr opened a pull request:

    https://github.com/apache/flink/pull/4736

    [FLINK-7371] [table] Add support for constant parameters in OVER aggregate

    ## What is the purpose of the change
    
    This PR allows to pass constants to OVER window aggregates. E.g. `.select('c, weightAvgFun('a, 42, 'b, "2") over 'w as 'wAvg)`.
    
    
    ## Brief change log
    
    Until now the constants where simply ignored. I added code generation for the literals in `AggregationCodeGenerator`.
    
    
    ## Verifying this change
    
    I add a ITCase for it. I might add more tests if I have time. In general, we need to rework the logic there a little bit, because I think we also do not support DATE, TIME etc. right now.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): no
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
      - The serializers: no
      - The runtime per-record code paths (performance sensitive): no
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
    
    ## Documentation
    
      - Does this pull request introduce a new feature? no
      - If yes, how is the feature documented? not applicable
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/twalthr/flink FLINK-7371

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4736.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4736
    
----
commit 19e056e038009e22e2b607b38931f575d5c948df
Author: twalthr <tw...@apache.org>
Date:   2017-09-27T15:11:28Z

    [FLINK-7371] [table] Add support for constant parameters in OVER aggregate

----


---

[GitHub] flink pull request #4736: [FLINK-7371] [table] Add support for constant para...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4736#discussion_r142254410
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala ---
    @@ -85,6 +85,46 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       }
     
       @Test
    +  def testOverWindowWithConstant(): Unit = {
    +
    +    val data = List(
    +      (1L, 1, "Hello"),
    +      (2L, 2, "Hello"),
    +      (3L, 3, "Hello"),
    +      (4L, 4, "Hello"),
    +      (5L, 5, "Hello"),
    +      (6L, 6, "Hello"),
    +      (7L, 7, "Hello World"),
    +      (8L, 8, "Hello World"),
    +      (8L, 8, "Hello World"),
    +      (20L, 20, "Hello World"))
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    env.setParallelism(1)
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +    StreamITCase.clear
    +    val stream = env.fromCollection(data)
    +    val table = stream.toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
    +    val weightAvgFun = new WeightedAvg
    +
    +    val windowedTable = table
    +      .window(
    +        Over partitionBy 'c orderBy 'proctime preceding UNBOUNDED_ROW as 'w)
    +      .select('c, weightAvgFun('a, 42, 'b, "2") over 'w as 'wAvg)
    +      .select('c, 'wAvg)
    --- End diff --
    
    can be removed


---

[GitHub] flink pull request #4736: [FLINK-7371] [table] Add support for constant para...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4736#discussion_r142247401
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -1670,4 +1670,34 @@ abstract class CodeGenerator(
     
         fieldTerm
       }
    +
    +  /**
    +    * Adds a reusable constant to the member area of the generated [[Function]].
    +    *
    +    * @param constant constant expression
    +    * @return member variable term
    +    */
    +  def addReusableBoxedConstant(constant: GeneratedExpression): String = {
    +    require(constant.literal, "Literal expected")
    +
    +    val fieldTerm = newName("constant")
    +
    +    val boxed = generateOutputFieldBoxing(constant)
    +    val boxedType = boxedTypeTermForTypeInfo(boxed.resultType)
    +
    +    val field =
    +      s"""
    +        |transient $boxedType $fieldTerm;
    --- End diff --
    
    why `transient`? Couldn't this be `final`?


---

[GitHub] flink issue #4736: [FLINK-7371] [table] Add support for constant parameters ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/4736
  
    Thanks @fhueske. I addressed you feedback. Will merge this now...


---

[GitHub] flink pull request #4736: [FLINK-7371] [table] Add support for constant para...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4736


---

[GitHub] flink pull request #4736: [FLINK-7371] [table] Add support for constant para...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4736#discussion_r142256742
  
    --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java ---
    @@ -86,6 +86,13 @@ public Long getValue(WeightedAvgAccum accumulator) {
     		}
     
     		// overloaded accumulate method
    +		// dummy to test constants
    +		public void accumulate(WeightedAvgAccum accumulator, long iValue, int iWeight, int x, String string) {
    +			accumulator.sum += iWeight + Integer.parseInt(string);
    --- End diff --
    
    change the method to 
    ```
    accumulator.sum += (iValue + Integer.parseInt(string)) * iWeight;
    accumulator.count += iWeight;
    ``` 
    
    to have some influence of the value of `string` in the result?


---