You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "radu (JIRA)" <ji...@apache.org> on 2017/04/04 07:48:41 UTC

[jira] [Created] (FLINK-6260) Distinct Aggregates for Group By Windows

radu created FLINK-6260:
---------------------------

             Summary: Distinct Aggregates for Group By Windows
                 Key: FLINK-6260
                 URL: https://issues.apache.org/jira/browse/FLINK-6260
             Project: Flink
          Issue Type: New Feature
          Components: Table API & SQL
            Reporter: radu


Time target: ProcTime/EventTime

SQL targeted query examples:
------------

Q1. Boundaries are expressed in GROUP BY clause and distinct is applied for the elements of the aggregate(s)

`SELECT MIN( DISTINCT rowtime), prodID FROM stream1 GROUP BY FLOOR(procTime() TO HOUR)`

Q2. Distinct is applied to the collection of outputs to be selected.

`SELECT STREAM DISTINCT procTime(), prodId  FROM stream1 GROUP BY FLOOR(procTime() TO DAY)`


=>  DISTINCT operation makes sense only within the context of windows or some bounded defined structures. Otherwise the operation would keep an infinite amount of data to ensure uniqueness and would not trigger for certain functions (e.g. aggregates)

=>  We can follow the same design/implementation as for JIRA FLINK-6249 (supporting Distinct Aggregates for OVER Windows)

=> We can consider as a sub-JIRA issue the implementation of DISTINCT for select clauses. 

=>   Aggregations over distinct elements without any boundary (i.e.     within SELECT clause) do not make sense just as aggregations do not     make sense without groupings or windows.


If distinct is applied as in Q1 example on group elements than either we define a new implementation if selection is general or extend the current implementation of grouped aggregates with distinct group aggregates

If distinct is applied as in Q2 example for the select all elements, then a new implementation needs to be defined. This would work over a specific window / processFunction and within the processing function the uniqueness of  the results to be processed will be done. This will happen for each partition. The data structure used to trace distinct elements will be reset with each new window (i.e., group by scope)
	

Examples
------------
`Q1: SELECT STREAM DISTINCT b FROM stream1 GROUP BY FLOOR(PROCTIME TO HOUR) `

`Q2: SELECT  COUNT(DISTINCT  b) FROM stream1 GROUP BY FLOOR(PROCTIME() TO HOUR) `

||Proctime||IngestionTime(Event)||Stream1||Q1||Q2||
||10:00:01|	(ab,1)|		  |   | 
||10:05:00| (aa,2)|       |   | 
||11:00:00|	      |	ab,aa | 2 | 
||11:03:00|	(aa,2)|	      |   |
||11:09:00|	(aa,2 |       |   | 
||12:00:00|		  | aa    | 1 |
|...|


Implementation option
---------------------
Considering that the behavior is similar as the one implemented for  over window behavior (with the difference that the distinction is reset for each , group scope), the implementation will be done by reusing the existing implementation of the over window functions.  Distinction can be achieved within the aggregate itself or within the window/processfunction logic that computes the aggregates. As multiple aggregates which require distinction can be computed in the same time, the preferred option is to create distinction within the process logic. For the case of selecting distinct outputs (i.e., not aggregates) we can follow the same implementation design: support distinction in the aggregation and than emitting only one output per each element saw (instead of calling aggregate method of the aggregates) . 




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)