You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by HG <ha...@gmail.com> on 2022/02/16 13:34:59 UTC

SQL / Table Api lag() over partition by ... and windowing

Hello all

I need to calculate the difference in time between ordered rows per
transactionId. All events should arrive within the timeframe set by the
out-of-orderness ( a couple of minutes). Events outside this time should be
ignored.

In SQL this would be :
select transactionId  , handlingTime , *handlingTime - lag(handlingTime,1)
over (partition by transactionId order by handlingTime) as elapsedTime*
from table

When I code :
Table result = tableEnv.sqlQuery("select transactionId, handlingTime,
*handlingTime
- if(null(lag(handlingTime) over (partition by transactionId order by
handlingTime),handlingTime) as elapsedTime* from tupled3DsTableVw")

*Will this obey the watermark strategy of the original Datastream? (see
further below)*
I have also tried to use the Table Api with a session window like :
Table t = tupled3DsTable
   .window(Session.withGap(lit(2).minutes()).on($("handlingTime")).as("w"))
.groupBy($("transactionId"), $("w"))
   .select($("handlingTime"), $("transactionId"), $("originalEvent"), $(
"handlingTime").max().over($("w")));
This gives:
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Could not resolve over call.
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)

and also :
Table t = tupled3DsTable
        .window(Over.partitionby($("transactionId")).orderBy($(
"handlingTime")).as("w")).select($("handlingTime"), $("transactionId"), $(
"originalEvent"), $("handlingTime").lag().as("previousHandlingTime"));
Which does not work since it cannot find lag function :-(

In java I have the following setup:
WatermarkStrategy<Tuple3<Long, String, String>> wmstrategy =
WatermarkStrategy
        .<Tuple3<Long, String,
String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
        .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<Long,
String, String>>() {
            @Override
            public long extractTimestamp(Tuple3<Long, String, String>
element, long handlingTime) {
            return element.f0;
         }});

DataStream<Tuple3<Long, String, String>> tuple3dswm =
tuple3ds.assignTimestampsAndWatermarks(wmstrategy);

Table tupled3DsTable = tableEnv.fromDataStream(tuple3ds,
Schema.newBuilder().column("f0","TIMESTAMP_LTZ(3)").column("f1","STRING").column("f2","STRING").watermark("f0",
"SOURCE_WATERMARK()")
            .build()).as("handlingTime", "transactionId", "originalEvent");

Re: SQL / Table Api lag() over partition by ... and windowing

Posted by HG <ha...@gmail.com>.
Well I thought that in order to do the same with only the datastream api I
would need to use  MapPartitionFunction.



Op do 17 feb. 2022 om 10:41 schreef Francesco Guardiani <
francesco@ververica.com>:

> Why do you need MapPartitionFunction?
>
> On Wed, Feb 16, 2022 at 7:02 PM HG <ha...@gmail.com> wrote:
>
>> Thanks
>>
>> Would the option for datastream be to write a MapPartitionFunction?
>>
>> Op wo 16 feb. 2022 om 16:35 schreef Francesco Guardiani <
>> francesco@ververica.com>:
>>
>>> > Which does not work since it cannot find lag function :-(
>>>
>>> lag and over are not supported at the moment with Table, so you need to
>>> use SQL for that.
>>>
>>> > *Will this obey the watermark strategy of the original Datastream?
>>> (see further below)*
>>>
>>> Yes. The code at the end of the mail is correct and should work fine. I
>>> have just one comment, if you're using this DataStream only to create the
>>> Table instance, you could also just define the watermark using the Schema
>>> builder itself, as described here:
>>> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/Schema.Builder.html#watermark-java.lang.String-org.apache.flink.table.expressions.Expression-
>>>
>>> On Wed, Feb 16, 2022 at 2:35 PM HG <ha...@gmail.com> wrote:
>>>
>>>> Hello all
>>>>
>>>> I need to calculate the difference in time between ordered rows per
>>>> transactionId. All events should arrive within the timeframe set by the
>>>> out-of-orderness ( a couple of minutes). Events outside this time should be
>>>> ignored.
>>>>
>>>> In SQL this would be :
>>>> select transactionId  , handlingTime , *handlingTime -
>>>> lag(handlingTime,1) over (partition by transactionId order by handlingTime)
>>>> as elapsedTime*
>>>> from table
>>>>
>>>> When I code :
>>>> Table result = tableEnv.sqlQuery("select transactionId, handlingTime, *handlingTime
>>>> - if(null(lag(handlingTime) over (partition by transactionId order by
>>>> handlingTime),handlingTime) as elapsedTime* from tupled3DsTableVw")
>>>>
>>>> *Will this obey the watermark strategy of the original Datastream? (see
>>>> further below)*
>>>> I have also tried to use the Table Api with a session window like :
>>>> Table t = tupled3DsTable
>>>>    .window(Session.withGap(lit(2).minutes()).on($("handlingTime")).as(
>>>> "w")).groupBy($("transactionId"), $("w"))
>>>>    .select($("handlingTime"), $("transactionId"), $("originalEvent"),
>>>> $("handlingTime").max().over($("w")));
>>>> This gives:
>>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>>> method caused an error: Could not resolve over call.
>>>>         at
>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
>>>>
>>>> and also :
>>>> Table t = tupled3DsTable
>>>>         .window(Over.partitionby($("transactionId")).orderBy($(
>>>> "handlingTime")).as("w")).select($("handlingTime"), $("transactionId"),
>>>> $("originalEvent"), $("handlingTime").lag().as("previousHandlingTime"
>>>> ));
>>>> Which does not work since it cannot find lag function :-(
>>>>
>>>> In java I have the following setup:
>>>> WatermarkStrategy<Tuple3<Long, String, String>> wmstrategy =
>>>> WatermarkStrategy
>>>>         .<Tuple3<Long, String,
>>>> String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
>>>>         .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
>>>>         .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<Long,
>>>> String, String>>() {
>>>>             @Override
>>>>             public long extractTimestamp(Tuple3<Long, String, String>
>>>> element, long handlingTime) {
>>>>             return element.f0;
>>>>          }});
>>>>
>>>> DataStream<Tuple3<Long, String, String>> tuple3dswm = tuple3ds.assignTimestampsAndWatermarks(wmstrategy);
>>>>
>>>> Table tupled3DsTable = tableEnv.fromDataStream(tuple3ds, Schema.newBuilder().column("f0","TIMESTAMP_LTZ(3)").column("f1","STRING").column("f2","STRING").watermark("f0", "SOURCE_WATERMARK()")
>>>>             .build()).as("handlingTime", "transactionId", "originalEvent");
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>

Re: SQL / Table Api lag() over partition by ... and windowing

Posted by Francesco Guardiani <fr...@ververica.com>.
Why do you need MapPartitionFunction?

On Wed, Feb 16, 2022 at 7:02 PM HG <ha...@gmail.com> wrote:

> Thanks
>
> Would the option for datastream be to write a MapPartitionFunction?
>
> Op wo 16 feb. 2022 om 16:35 schreef Francesco Guardiani <
> francesco@ververica.com>:
>
>> > Which does not work since it cannot find lag function :-(
>>
>> lag and over are not supported at the moment with Table, so you need to
>> use SQL for that.
>>
>> > *Will this obey the watermark strategy of the original Datastream?
>> (see further below)*
>>
>> Yes. The code at the end of the mail is correct and should work fine. I
>> have just one comment, if you're using this DataStream only to create the
>> Table instance, you could also just define the watermark using the Schema
>> builder itself, as described here:
>> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/Schema.Builder.html#watermark-java.lang.String-org.apache.flink.table.expressions.Expression-
>>
>> On Wed, Feb 16, 2022 at 2:35 PM HG <ha...@gmail.com> wrote:
>>
>>> Hello all
>>>
>>> I need to calculate the difference in time between ordered rows per
>>> transactionId. All events should arrive within the timeframe set by the
>>> out-of-orderness ( a couple of minutes). Events outside this time should be
>>> ignored.
>>>
>>> In SQL this would be :
>>> select transactionId  , handlingTime , *handlingTime -
>>> lag(handlingTime,1) over (partition by transactionId order by handlingTime)
>>> as elapsedTime*
>>> from table
>>>
>>> When I code :
>>> Table result = tableEnv.sqlQuery("select transactionId, handlingTime, *handlingTime
>>> - if(null(lag(handlingTime) over (partition by transactionId order by
>>> handlingTime),handlingTime) as elapsedTime* from tupled3DsTableVw")
>>>
>>> *Will this obey the watermark strategy of the original Datastream? (see
>>> further below)*
>>> I have also tried to use the Table Api with a session window like :
>>> Table t = tupled3DsTable
>>>    .window(Session.withGap(lit(2).minutes()).on($("handlingTime")).as(
>>> "w")).groupBy($("transactionId"), $("w"))
>>>    .select($("handlingTime"), $("transactionId"), $("originalEvent"), $(
>>> "handlingTime").max().over($("w")));
>>> This gives:
>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error: Could not resolve over call.
>>>         at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
>>>
>>> and also :
>>> Table t = tupled3DsTable
>>>         .window(Over.partitionby($("transactionId")).orderBy($(
>>> "handlingTime")).as("w")).select($("handlingTime"), $("transactionId"),
>>> $("originalEvent"), $("handlingTime").lag().as("previousHandlingTime"));
>>> Which does not work since it cannot find lag function :-(
>>>
>>> In java I have the following setup:
>>> WatermarkStrategy<Tuple3<Long, String, String>> wmstrategy =
>>> WatermarkStrategy
>>>         .<Tuple3<Long, String,
>>> String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
>>>         .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
>>>         .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<Long,
>>> String, String>>() {
>>>             @Override
>>>             public long extractTimestamp(Tuple3<Long, String, String>
>>> element, long handlingTime) {
>>>             return element.f0;
>>>          }});
>>>
>>> DataStream<Tuple3<Long, String, String>> tuple3dswm = tuple3ds.assignTimestampsAndWatermarks(wmstrategy);
>>>
>>> Table tupled3DsTable = tableEnv.fromDataStream(tuple3ds, Schema.newBuilder().column("f0","TIMESTAMP_LTZ(3)").column("f1","STRING").column("f2","STRING").watermark("f0", "SOURCE_WATERMARK()")
>>>             .build()).as("handlingTime", "transactionId", "originalEvent");
>>>
>>>
>>>
>>>
>>>
>>>

Re: SQL / Table Api lag() over partition by ... and windowing

Posted by HG <ha...@gmail.com>.
Thanks

Would the option for datastream be to write a MapPartitionFunction?

Op wo 16 feb. 2022 om 16:35 schreef Francesco Guardiani <
francesco@ververica.com>:

> > Which does not work since it cannot find lag function :-(
>
> lag and over are not supported at the moment with Table, so you need to
> use SQL for that.
>
> > *Will this obey the watermark strategy of the original Datastream? (see
> further below)*
>
> Yes. The code at the end of the mail is correct and should work fine. I
> have just one comment, if you're using this DataStream only to create the
> Table instance, you could also just define the watermark using the Schema
> builder itself, as described here:
> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/Schema.Builder.html#watermark-java.lang.String-org.apache.flink.table.expressions.Expression-
>
> On Wed, Feb 16, 2022 at 2:35 PM HG <ha...@gmail.com> wrote:
>
>> Hello all
>>
>> I need to calculate the difference in time between ordered rows per
>> transactionId. All events should arrive within the timeframe set by the
>> out-of-orderness ( a couple of minutes). Events outside this time should be
>> ignored.
>>
>> In SQL this would be :
>> select transactionId  , handlingTime , *handlingTime -
>> lag(handlingTime,1) over (partition by transactionId order by handlingTime)
>> as elapsedTime*
>> from table
>>
>> When I code :
>> Table result = tableEnv.sqlQuery("select transactionId, handlingTime, *handlingTime
>> - if(null(lag(handlingTime) over (partition by transactionId order by
>> handlingTime),handlingTime) as elapsedTime* from tupled3DsTableVw")
>>
>> *Will this obey the watermark strategy of the original Datastream? (see
>> further below)*
>> I have also tried to use the Table Api with a session window like :
>> Table t = tupled3DsTable
>>    .window(Session.withGap(lit(2).minutes()).on($("handlingTime")).as("w"
>> )).groupBy($("transactionId"), $("w"))
>>    .select($("handlingTime"), $("transactionId"), $("originalEvent"), $(
>> "handlingTime").max().over($("w")));
>> This gives:
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: Could not resolve over call.
>>         at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
>>
>> and also :
>> Table t = tupled3DsTable
>>         .window(Over.partitionby($("transactionId")).orderBy($(
>> "handlingTime")).as("w")).select($("handlingTime"), $("transactionId"),
>> $("originalEvent"), $("handlingTime").lag().as("previousHandlingTime"));
>> Which does not work since it cannot find lag function :-(
>>
>> In java I have the following setup:
>> WatermarkStrategy<Tuple3<Long, String, String>> wmstrategy =
>> WatermarkStrategy
>>         .<Tuple3<Long, String,
>> String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
>>         .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
>>         .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<Long,
>> String, String>>() {
>>             @Override
>>             public long extractTimestamp(Tuple3<Long, String, String>
>> element, long handlingTime) {
>>             return element.f0;
>>          }});
>>
>> DataStream<Tuple3<Long, String, String>> tuple3dswm = tuple3ds.assignTimestampsAndWatermarks(wmstrategy);
>>
>> Table tupled3DsTable = tableEnv.fromDataStream(tuple3ds, Schema.newBuilder().column("f0","TIMESTAMP_LTZ(3)").column("f1","STRING").column("f2","STRING").watermark("f0", "SOURCE_WATERMARK()")
>>             .build()).as("handlingTime", "transactionId", "originalEvent");
>>
>>
>>
>>
>>
>>

Re: SQL / Table Api lag() over partition by ... and windowing

Posted by Francesco Guardiani <fr...@ververica.com>.
> Which does not work since it cannot find lag function :-(

lag and over are not supported at the moment with Table, so you need to use
SQL for that.

> *Will this obey the watermark strategy of the original Datastream? (see
further below)*

Yes. The code at the end of the mail is correct and should work fine. I
have just one comment, if you're using this DataStream only to create the
Table instance, you could also just define the watermark using the Schema
builder itself, as described here:
https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/Schema.Builder.html#watermark-java.lang.String-org.apache.flink.table.expressions.Expression-

On Wed, Feb 16, 2022 at 2:35 PM HG <ha...@gmail.com> wrote:

> Hello all
>
> I need to calculate the difference in time between ordered rows per
> transactionId. All events should arrive within the timeframe set by the
> out-of-orderness ( a couple of minutes). Events outside this time should be
> ignored.
>
> In SQL this would be :
> select transactionId  , handlingTime , *handlingTime -
> lag(handlingTime,1) over (partition by transactionId order by handlingTime)
> as elapsedTime*
> from table
>
> When I code :
> Table result = tableEnv.sqlQuery("select transactionId, handlingTime, *handlingTime
> - if(null(lag(handlingTime) over (partition by transactionId order by
> handlingTime),handlingTime) as elapsedTime* from tupled3DsTableVw")
>
> *Will this obey the watermark strategy of the original Datastream? (see
> further below)*
> I have also tried to use the Table Api with a session window like :
> Table t = tupled3DsTable
>    .window(Session.withGap(lit(2).minutes()).on($("handlingTime")).as("w"
> )).groupBy($("transactionId"), $("w"))
>    .select($("handlingTime"), $("transactionId"), $("originalEvent"), $(
> "handlingTime").max().over($("w")));
> This gives:
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Could not resolve over call.
>         at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
>
> and also :
> Table t = tupled3DsTable
>         .window(Over.partitionby($("transactionId")).orderBy($(
> "handlingTime")).as("w")).select($("handlingTime"), $("transactionId"), $(
> "originalEvent"), $("handlingTime").lag().as("previousHandlingTime"));
> Which does not work since it cannot find lag function :-(
>
> In java I have the following setup:
> WatermarkStrategy<Tuple3<Long, String, String>> wmstrategy =
> WatermarkStrategy
>         .<Tuple3<Long, String,
> String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
>         .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
>         .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<Long,
> String, String>>() {
>             @Override
>             public long extractTimestamp(Tuple3<Long, String, String>
> element, long handlingTime) {
>             return element.f0;
>          }});
>
> DataStream<Tuple3<Long, String, String>> tuple3dswm = tuple3ds.assignTimestampsAndWatermarks(wmstrategy);
>
> Table tupled3DsTable = tableEnv.fromDataStream(tuple3ds, Schema.newBuilder().column("f0","TIMESTAMP_LTZ(3)").column("f1","STRING").column("f2","STRING").watermark("f0", "SOURCE_WATERMARK()")
>             .build()).as("handlingTime", "transactionId", "originalEvent");
>
>
>
>
>
>