You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Marco Villalobos <mv...@kineteque.com> on 2020/12/10 15:54:57 UTC

How do I pass the aggregated results of an SQL TUMBLING Window as one set of data to a Process Function?

I am sorry to task this twice.  I reworded my question though, and I never got an answer. I am trying to learn how to use the SQL api, but mix-in the Streaming API
where there is too much complex business logic.

GIVEN two windows, window X an SQL tumbling processing time window of 15 minutes, and window Y a ProcessFunction,
AND 200000 records entered window X, and performs its business logic.

How can I assure that Window Y will process exactly all the records
that left window A within the same window?

I am trying to do this:

200000 records streamed in --> SQL TUMBLING WINDOW --> aggregated table (the is results of table with 200000 records in it ) --> PROCESS FUNCTION (aggregated table)


Re: How do I pass the aggregated results of an SQL TUMBLING Window as one set of data to a Process Function?

Posted by Timo Walther <tw...@apache.org>.
Hi Marco,

sorry for the late reply. Have you looked into user-defined aggregate 
functions for SQL? I think your requirements can be easily implemented 
there. You can declare multiple aggregate functions per window. There is 
also the built-in function LISTAGG that might help for your use case. 
But Flink SQL aggregate functions support arbirary data types (e.g. 
arrays as result type).

Regarding `do I need to wait another 15 minutes to aggregate this`: This 
is another example of why event time is important. Actually you would 
like to process the data quicker than wall-clock time. If your example 
would work in event-time, the watermark would be emitted after the 
window 1 has been processed and this watermark would also trigger the 
second window immediately without the need to another 15 min in 
processing time.

I hope this helps.

Regards,
Timo

On 12.12.20 01:38, Marco Villalobos wrote:
> Alright, maybe my example needs to be more concrete. How about this:
> In this example, I don't want to create to windows just to re-combine 
> what was just aggregated in SQL.  Is there a way to transform the 
> aggregate results into one datastream object so that I don't have to 
> aggregate again?
> 
> 
> // aggregate this stream for 15 minutes
> final Table employeeDailyPurchasesTable =tableEnv.sqlQuery("SELECT\n" +
>        " t.organization_id, t.department_id, s.date, s.employee_id, t.fullName, 
> t.dob, SUM(s.purchase) AS purchases\n" +
>        "FROM\n" +
>        " employee_purchases s\n" +
>        "LEFT JOIN\n" +
>        " employees FOR SYSTEM_TIME AS OF s.procTime AS t ON t.organization = 
> s.organization AND t.department = s.department AND t.employee_id = 
> s.employee_id\n" +
>        "GROUP BY\n" +
>        " TUMBLE(s.procTime, INTERVAL '15' MINUTE), t.organization_id, 
> t.department_id, s.date, s.employee_id, t.fullName, t.dob");
> 
> // now I want everything that was just aggregated processed together,
> // below gives me each row again in a stream
> final DataStream<Row> employeeDailyPurchasesDataStream =tableEnv.toAppendStream(employeeDailyPurchasesTable, Row.class);
> 
> // so, do I need to wait another 15 minutes to aggregate this? It was 
> just aggregated for 15 minutes above!
> // how do I get the previous aggregated results into one object so that 
> I don't have to wait and aggregate it again
> final DataStream<DailyEmployeePurchases> aggregatedAgainBecauseINeedHelp =employeeDailyPurchasesDataStream
> .keyBy(0, 1, 2)
>        .window(TumblingProcessingTimeWindows.of(Time.minutes(15)))
>        .aggregate(new AggregateFunction<Row, DailyEmployeePurchases, DailyEmployeePurchases>() {
> 
>           @Override
> public DailyEmployeePurchases createAccumulator() {
>              return new DailyEmployeePurchases();
> }
> 
>           @Override
> public DailyEmployeePurchases add(Row value, DailyEmployeePurchases accumulator) {
>              return accumulator.add(value);
> }
> 
>           @Override
> public DailyEmployeePurchases getResult(DailyEmployeePurchases accumulator) {
>              return accumulator;
> }
> 
>           @Override
> public DailyEmployeePurchases merge(DailyEmployeePurchases a, DailyEmployeePurchases b) {
>              return a.merge(b);
> }
>        });
> 
> // important business logic that needs to be applied to the group of 
> employees
> aggregatedAgainBecauseINeedHelp.keyBy("organizationId", "departmentId")
>        .process(new KeyedProcessFunction<Tuple, DailyEmployeePurchases, DailyEmployeePurchases>() {
> 
>           @Override
> public void processElement(DailyEmployeePurchases value, Context ctx, Collector<DailyEmployeePurchases> out)throws Exception {
>              // very important stuff here
> }
>        });
> 
> 
> 


Re: How do I pass the aggregated results of an SQL TUMBLING Window as one set of data to a Process Function?

Posted by Marco Villalobos <mv...@kineteque.com>.
Alright, maybe my example needs to be more concrete. How about this:
In this example, I don't want to create to windows just to re-combine
what was just aggregated in SQL.  Is there a way to transform the aggregate
results into one datastream object so that I don't have to aggregate again?


// aggregate this stream for 15 minutes
final Table employeeDailyPurchasesTable = tableEnv.sqlQuery("SELECT\n" +
      "    t.organization_id, t.department_id, s.date, s.employee_id,
t.fullName, t.dob, SUM(s.purchase) AS purchases\n" +
      "FROM\n" +
      "    employee_purchases s\n" +
      "LEFT JOIN\n" +
      "    employees FOR SYSTEM_TIME AS OF s.procTime AS t ON
t.organization = s.organization AND t.department = s.department AND
t.employee_id = s.employee_id\n" +
      "GROUP BY\n" +
      "    TUMBLE(s.procTime, INTERVAL '15' MINUTE),
t.organization_id, t.department_id, s.date, s.employee_id, t.fullName,
t.dob");

// now I want everything that was just aggregated processed together,
// below gives me each row again in a stream
final DataStream<Row> employeeDailyPurchasesDataStream =
tableEnv.toAppendStream(employeeDailyPurchasesTable, Row.class);

// so, do I need to wait another 15 minutes to aggregate this? It was
just aggregated for 15 minutes above!
// how do I get the previous aggregated results into one object so
that I don't have to wait and aggregate it again
final DataStream<DailyEmployeePurchases>
aggregatedAgainBecauseINeedHelp = employeeDailyPurchasesDataStream
      .keyBy(0, 1, 2)
      .window(TumblingProcessingTimeWindows.of(Time.minutes(15)))
      .aggregate(new AggregateFunction<Row, DailyEmployeePurchases,
DailyEmployeePurchases>() {

         @Override
         public DailyEmployeePurchases createAccumulator() {
            return new DailyEmployeePurchases();
         }

         @Override
         public DailyEmployeePurchases add(Row value,
DailyEmployeePurchases accumulator) {
            return accumulator.add(value);
         }

         @Override
         public DailyEmployeePurchases
getResult(DailyEmployeePurchases accumulator) {
            return accumulator;
         }

         @Override
         public DailyEmployeePurchases merge(DailyEmployeePurchases a,
DailyEmployeePurchases b) {
            return a.merge(b);
         }
      });

// important business logic that needs to be applied to the group of employees
aggregatedAgainBecauseINeedHelp.keyBy("organizationId", "departmentId")
      .process(new KeyedProcessFunction<Tuple, DailyEmployeePurchases,
DailyEmployeePurchases>() {

         @Override
         public void processElement(DailyEmployeePurchases value,
Context ctx, Collector<DailyEmployeePurchases> out) throws Exception {
            // very important stuff here
         }
      });