You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by twalthr <gi...@git.apache.org> on 2017/09/27 15:31:50 UTC
[GitHub] flink pull request #4736: [FLINK-7371] [table] Add support for constant para...
GitHub user twalthr opened a pull request:
https://github.com/apache/flink/pull/4736
[FLINK-7371] [table] Add support for constant parameters in OVER aggregate
## What is the purpose of the change
This PR allows to pass constants to OVER window aggregates. E.g. `.select('c, weightAvgFun('a, 42, 'b, "2") over 'w as 'wAvg)`.
## Brief change log
Until now the constants where simply ignored. I added code generation for the literals in `AggregationCodeGenerator`.
## Verifying this change
I add a ITCase for it. I might add more tests if I have time. In general, we need to rework the logic there a little bit, because I think we also do not support DATE, TIME etc. right now.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
- The serializers: no
- The runtime per-record code paths (performance sensitive): no
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
## Documentation
- Does this pull request introduce a new feature? no
- If yes, how is the feature documented? not applicable
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/twalthr/flink FLINK-7371
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/4736.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #4736
----
commit 19e056e038009e22e2b607b38931f575d5c948df
Author: twalthr <tw...@apache.org>
Date: 2017-09-27T15:11:28Z
[FLINK-7371] [table] Add support for constant parameters in OVER aggregate
----
---
[GitHub] flink pull request #4736: [FLINK-7371] [table] Add support for constant para...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/4736#discussion_r142254410
--- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala ---
@@ -85,6 +85,46 @@ class OverWindowITCase extends StreamingWithStateTestBase {
}
@Test
+ def testOverWindowWithConstant(): Unit = {
+
+ val data = List(
+ (1L, 1, "Hello"),
+ (2L, 2, "Hello"),
+ (3L, 3, "Hello"),
+ (4L, 4, "Hello"),
+ (5L, 5, "Hello"),
+ (6L, 6, "Hello"),
+ (7L, 7, "Hello World"),
+ (8L, 8, "Hello World"),
+ (8L, 8, "Hello World"),
+ (20L, 20, "Hello World"))
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setParallelism(1)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+ StreamITCase.clear
+ val stream = env.fromCollection(data)
+ val table = stream.toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+ val weightAvgFun = new WeightedAvg
+
+ val windowedTable = table
+ .window(
+ Over partitionBy 'c orderBy 'proctime preceding UNBOUNDED_ROW as 'w)
+ .select('c, weightAvgFun('a, 42, 'b, "2") over 'w as 'wAvg)
+ .select('c, 'wAvg)
--- End diff --
can be removed
---
[GitHub] flink pull request #4736: [FLINK-7371] [table] Add support for constant para...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/4736#discussion_r142247401
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
@@ -1670,4 +1670,34 @@ abstract class CodeGenerator(
fieldTerm
}
+
+ /**
+ * Adds a reusable constant to the member area of the generated [[Function]].
+ *
+ * @param constant constant expression
+ * @return member variable term
+ */
+ def addReusableBoxedConstant(constant: GeneratedExpression): String = {
+ require(constant.literal, "Literal expected")
+
+ val fieldTerm = newName("constant")
+
+ val boxed = generateOutputFieldBoxing(constant)
+ val boxedType = boxedTypeTermForTypeInfo(boxed.resultType)
+
+ val field =
+ s"""
+ |transient $boxedType $fieldTerm;
--- End diff --
why `transient`? Couldn't this be `final`?
---
[GitHub] flink issue #4736: [FLINK-7371] [table] Add support for constant parameters ...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:
https://github.com/apache/flink/pull/4736
Thanks @fhueske. I addressed you feedback. Will merge this now...
---
[GitHub] flink pull request #4736: [FLINK-7371] [table] Add support for constant para...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/4736
---
[GitHub] flink pull request #4736: [FLINK-7371] [table] Add support for constant para...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/4736#discussion_r142256742
--- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java ---
@@ -86,6 +86,13 @@ public Long getValue(WeightedAvgAccum accumulator) {
}
// overloaded accumulate method
+ // dummy to test constants
+ public void accumulate(WeightedAvgAccum accumulator, long iValue, int iWeight, int x, String string) {
+ accumulator.sum += iWeight + Integer.parseInt(string);
--- End diff --
change the method to
```
accumulator.sum += (iValue + Integer.parseInt(string)) * iWeight;
accumulator.count += iWeight;
```
to have some influence of the value of `string` in the result?
---