You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by HG <ha...@gmail.com> on 2022/02/16 13:34:59 UTC
SQL / Table Api lag() over partition by ... and windowing
Hello all
I need to calculate the difference in time between ordered rows per
transactionId. All events should arrive within the timeframe set by the
out-of-orderness ( a couple of minutes). Events outside this time should be
ignored.
In SQL this would be :
select transactionId , handlingTime , *handlingTime - lag(handlingTime,1)
over (partition by transactionId order by handlingTime) as elapsedTime*
from table
When I code :
Table result = tableEnv.sqlQuery("select transactionId, handlingTime,
*handlingTime
- if(null(lag(handlingTime) over (partition by transactionId order by
handlingTime),handlingTime) as elapsedTime* from tupled3DsTableVw")
*Will this obey the watermark strategy of the original Datastream? (see
further below)*
I have also tried to use the Table Api with a session window like :
Table t = tupled3DsTable
.window(Session.withGap(lit(2).minutes()).on($("handlingTime")).as("w"))
.groupBy($("transactionId"), $("w"))
.select($("handlingTime"), $("transactionId"), $("originalEvent"), $(
"handlingTime").max().over($("w")));
This gives:
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Could not resolve over call.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
and also :
Table t = tupled3DsTable
.window(Over.partitionby($("transactionId")).orderBy($(
"handlingTime")).as("w")).select($("handlingTime"), $("transactionId"), $(
"originalEvent"), $("handlingTime").lag().as("previousHandlingTime"));
Which does not work since it cannot find lag function :-(
In java I have the following setup:
WatermarkStrategy<Tuple3<Long, String, String>> wmstrategy =
WatermarkStrategy
.<Tuple3<Long, String,
String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
.withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<Long,
String, String>>() {
@Override
public long extractTimestamp(Tuple3<Long, String, String>
element, long handlingTime) {
return element.f0;
}});
DataStream<Tuple3<Long, String, String>> tuple3dswm =
tuple3ds.assignTimestampsAndWatermarks(wmstrategy);
Table tupled3DsTable = tableEnv.fromDataStream(tuple3ds,
Schema.newBuilder().column("f0","TIMESTAMP_LTZ(3)").column("f1","STRING").column("f2","STRING").watermark("f0",
"SOURCE_WATERMARK()")
.build()).as("handlingTime", "transactionId", "originalEvent");
Re: SQL / Table Api lag() over partition by ... and windowing
Posted by HG <ha...@gmail.com>.
Well I thought that in order to do the same with only the datastream api I
would need to use MapPartitionFunction.
Op do 17 feb. 2022 om 10:41 schreef Francesco Guardiani <
francesco@ververica.com>:
> Why do you need MapPartitionFunction?
>
> On Wed, Feb 16, 2022 at 7:02 PM HG <ha...@gmail.com> wrote:
>
>> Thanks
>>
>> Would the option for datastream be to write a MapPartitionFunction?
>>
>> Op wo 16 feb. 2022 om 16:35 schreef Francesco Guardiani <
>> francesco@ververica.com>:
>>
>>> > Which does not work since it cannot find lag function :-(
>>>
>>> lag and over are not supported at the moment with Table, so you need to
>>> use SQL for that.
>>>
>>> > *Will this obey the watermark strategy of the original Datastream?
>>> (see further below)*
>>>
>>> Yes. The code at the end of the mail is correct and should work fine. I
>>> have just one comment, if you're using this DataStream only to create the
>>> Table instance, you could also just define the watermark using the Schema
>>> builder itself, as described here:
>>> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/Schema.Builder.html#watermark-java.lang.String-org.apache.flink.table.expressions.Expression-
>>>
>>> On Wed, Feb 16, 2022 at 2:35 PM HG <ha...@gmail.com> wrote:
>>>
>>>> Hello all
>>>>
>>>> I need to calculate the difference in time between ordered rows per
>>>> transactionId. All events should arrive within the timeframe set by the
>>>> out-of-orderness ( a couple of minutes). Events outside this time should be
>>>> ignored.
>>>>
>>>> In SQL this would be :
>>>> select transactionId , handlingTime , *handlingTime -
>>>> lag(handlingTime,1) over (partition by transactionId order by handlingTime)
>>>> as elapsedTime*
>>>> from table
>>>>
>>>> When I code :
>>>> Table result = tableEnv.sqlQuery("select transactionId, handlingTime, *handlingTime
>>>> - if(null(lag(handlingTime) over (partition by transactionId order by
>>>> handlingTime),handlingTime) as elapsedTime* from tupled3DsTableVw")
>>>>
>>>> *Will this obey the watermark strategy of the original Datastream? (see
>>>> further below)*
>>>> I have also tried to use the Table Api with a session window like :
>>>> Table t = tupled3DsTable
>>>> .window(Session.withGap(lit(2).minutes()).on($("handlingTime")).as(
>>>> "w")).groupBy($("transactionId"), $("w"))
>>>> .select($("handlingTime"), $("transactionId"), $("originalEvent"),
>>>> $("handlingTime").max().over($("w")));
>>>> This gives:
>>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>>> method caused an error: Could not resolve over call.
>>>> at
>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
>>>>
>>>> and also :
>>>> Table t = tupled3DsTable
>>>> .window(Over.partitionby($("transactionId")).orderBy($(
>>>> "handlingTime")).as("w")).select($("handlingTime"), $("transactionId"),
>>>> $("originalEvent"), $("handlingTime").lag().as("previousHandlingTime"
>>>> ));
>>>> Which does not work since it cannot find lag function :-(
>>>>
>>>> In java I have the following setup:
>>>> WatermarkStrategy<Tuple3<Long, String, String>> wmstrategy =
>>>> WatermarkStrategy
>>>> .<Tuple3<Long, String,
>>>> String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
>>>> .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
>>>> .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<Long,
>>>> String, String>>() {
>>>> @Override
>>>> public long extractTimestamp(Tuple3<Long, String, String>
>>>> element, long handlingTime) {
>>>> return element.f0;
>>>> }});
>>>>
>>>> DataStream<Tuple3<Long, String, String>> tuple3dswm = tuple3ds.assignTimestampsAndWatermarks(wmstrategy);
>>>>
>>>> Table tupled3DsTable = tableEnv.fromDataStream(tuple3ds, Schema.newBuilder().column("f0","TIMESTAMP_LTZ(3)").column("f1","STRING").column("f2","STRING").watermark("f0", "SOURCE_WATERMARK()")
>>>> .build()).as("handlingTime", "transactionId", "originalEvent");
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
Re: SQL / Table Api lag() over partition by ... and windowing
Posted by Francesco Guardiani <fr...@ververica.com>.
Why do you need MapPartitionFunction?
On Wed, Feb 16, 2022 at 7:02 PM HG <ha...@gmail.com> wrote:
> Thanks
>
> Would the option for datastream be to write a MapPartitionFunction?
>
> Op wo 16 feb. 2022 om 16:35 schreef Francesco Guardiani <
> francesco@ververica.com>:
>
>> > Which does not work since it cannot find lag function :-(
>>
>> lag and over are not supported at the moment with Table, so you need to
>> use SQL for that.
>>
>> > *Will this obey the watermark strategy of the original Datastream?
>> (see further below)*
>>
>> Yes. The code at the end of the mail is correct and should work fine. I
>> have just one comment, if you're using this DataStream only to create the
>> Table instance, you could also just define the watermark using the Schema
>> builder itself, as described here:
>> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/Schema.Builder.html#watermark-java.lang.String-org.apache.flink.table.expressions.Expression-
>>
>> On Wed, Feb 16, 2022 at 2:35 PM HG <ha...@gmail.com> wrote:
>>
>>> Hello all
>>>
>>> I need to calculate the difference in time between ordered rows per
>>> transactionId. All events should arrive within the timeframe set by the
>>> out-of-orderness ( a couple of minutes). Events outside this time should be
>>> ignored.
>>>
>>> In SQL this would be :
>>> select transactionId , handlingTime , *handlingTime -
>>> lag(handlingTime,1) over (partition by transactionId order by handlingTime)
>>> as elapsedTime*
>>> from table
>>>
>>> When I code :
>>> Table result = tableEnv.sqlQuery("select transactionId, handlingTime, *handlingTime
>>> - if(null(lag(handlingTime) over (partition by transactionId order by
>>> handlingTime),handlingTime) as elapsedTime* from tupled3DsTableVw")
>>>
>>> *Will this obey the watermark strategy of the original Datastream? (see
>>> further below)*
>>> I have also tried to use the Table Api with a session window like :
>>> Table t = tupled3DsTable
>>> .window(Session.withGap(lit(2).minutes()).on($("handlingTime")).as(
>>> "w")).groupBy($("transactionId"), $("w"))
>>> .select($("handlingTime"), $("transactionId"), $("originalEvent"), $(
>>> "handlingTime").max().over($("w")));
>>> This gives:
>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error: Could not resolve over call.
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
>>>
>>> and also :
>>> Table t = tupled3DsTable
>>> .window(Over.partitionby($("transactionId")).orderBy($(
>>> "handlingTime")).as("w")).select($("handlingTime"), $("transactionId"),
>>> $("originalEvent"), $("handlingTime").lag().as("previousHandlingTime"));
>>> Which does not work since it cannot find lag function :-(
>>>
>>> In java I have the following setup:
>>> WatermarkStrategy<Tuple3<Long, String, String>> wmstrategy =
>>> WatermarkStrategy
>>> .<Tuple3<Long, String,
>>> String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
>>> .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
>>> .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<Long,
>>> String, String>>() {
>>> @Override
>>> public long extractTimestamp(Tuple3<Long, String, String>
>>> element, long handlingTime) {
>>> return element.f0;
>>> }});
>>>
>>> DataStream<Tuple3<Long, String, String>> tuple3dswm = tuple3ds.assignTimestampsAndWatermarks(wmstrategy);
>>>
>>> Table tupled3DsTable = tableEnv.fromDataStream(tuple3ds, Schema.newBuilder().column("f0","TIMESTAMP_LTZ(3)").column("f1","STRING").column("f2","STRING").watermark("f0", "SOURCE_WATERMARK()")
>>> .build()).as("handlingTime", "transactionId", "originalEvent");
>>>
>>>
>>>
>>>
>>>
>>>
Re: SQL / Table Api lag() over partition by ... and windowing
Posted by HG <ha...@gmail.com>.
Thanks
Would the option for datastream be to write a MapPartitionFunction?
Op wo 16 feb. 2022 om 16:35 schreef Francesco Guardiani <
francesco@ververica.com>:
> > Which does not work since it cannot find lag function :-(
>
> lag and over are not supported at the moment with Table, so you need to
> use SQL for that.
>
> > *Will this obey the watermark strategy of the original Datastream? (see
> further below)*
>
> Yes. The code at the end of the mail is correct and should work fine. I
> have just one comment, if you're using this DataStream only to create the
> Table instance, you could also just define the watermark using the Schema
> builder itself, as described here:
> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/Schema.Builder.html#watermark-java.lang.String-org.apache.flink.table.expressions.Expression-
>
> On Wed, Feb 16, 2022 at 2:35 PM HG <ha...@gmail.com> wrote:
>
>> Hello all
>>
>> I need to calculate the difference in time between ordered rows per
>> transactionId. All events should arrive within the timeframe set by the
>> out-of-orderness ( a couple of minutes). Events outside this time should be
>> ignored.
>>
>> In SQL this would be :
>> select transactionId , handlingTime , *handlingTime -
>> lag(handlingTime,1) over (partition by transactionId order by handlingTime)
>> as elapsedTime*
>> from table
>>
>> When I code :
>> Table result = tableEnv.sqlQuery("select transactionId, handlingTime, *handlingTime
>> - if(null(lag(handlingTime) over (partition by transactionId order by
>> handlingTime),handlingTime) as elapsedTime* from tupled3DsTableVw")
>>
>> *Will this obey the watermark strategy of the original Datastream? (see
>> further below)*
>> I have also tried to use the Table Api with a session window like :
>> Table t = tupled3DsTable
>> .window(Session.withGap(lit(2).minutes()).on($("handlingTime")).as("w"
>> )).groupBy($("transactionId"), $("w"))
>> .select($("handlingTime"), $("transactionId"), $("originalEvent"), $(
>> "handlingTime").max().over($("w")));
>> This gives:
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: Could not resolve over call.
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
>>
>> and also :
>> Table t = tupled3DsTable
>> .window(Over.partitionby($("transactionId")).orderBy($(
>> "handlingTime")).as("w")).select($("handlingTime"), $("transactionId"),
>> $("originalEvent"), $("handlingTime").lag().as("previousHandlingTime"));
>> Which does not work since it cannot find lag function :-(
>>
>> In java I have the following setup:
>> WatermarkStrategy<Tuple3<Long, String, String>> wmstrategy =
>> WatermarkStrategy
>> .<Tuple3<Long, String,
>> String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
>> .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
>> .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<Long,
>> String, String>>() {
>> @Override
>> public long extractTimestamp(Tuple3<Long, String, String>
>> element, long handlingTime) {
>> return element.f0;
>> }});
>>
>> DataStream<Tuple3<Long, String, String>> tuple3dswm = tuple3ds.assignTimestampsAndWatermarks(wmstrategy);
>>
>> Table tupled3DsTable = tableEnv.fromDataStream(tuple3ds, Schema.newBuilder().column("f0","TIMESTAMP_LTZ(3)").column("f1","STRING").column("f2","STRING").watermark("f0", "SOURCE_WATERMARK()")
>> .build()).as("handlingTime", "transactionId", "originalEvent");
>>
>>
>>
>>
>>
>>
Re: SQL / Table Api lag() over partition by ... and windowing
Posted by Francesco Guardiani <fr...@ververica.com>.
> Which does not work since it cannot find lag function :-(
lag and over are not supported at the moment with Table, so you need to use
SQL for that.
> *Will this obey the watermark strategy of the original Datastream? (see
further below)*
Yes. The code at the end of the mail is correct and should work fine. I
have just one comment, if you're using this DataStream only to create the
Table instance, you could also just define the watermark using the Schema
builder itself, as described here:
https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/Schema.Builder.html#watermark-java.lang.String-org.apache.flink.table.expressions.Expression-
On Wed, Feb 16, 2022 at 2:35 PM HG <ha...@gmail.com> wrote:
> Hello all
>
> I need to calculate the difference in time between ordered rows per
> transactionId. All events should arrive within the timeframe set by the
> out-of-orderness ( a couple of minutes). Events outside this time should be
> ignored.
>
> In SQL this would be :
> select transactionId , handlingTime , *handlingTime -
> lag(handlingTime,1) over (partition by transactionId order by handlingTime)
> as elapsedTime*
> from table
>
> When I code :
> Table result = tableEnv.sqlQuery("select transactionId, handlingTime, *handlingTime
> - if(null(lag(handlingTime) over (partition by transactionId order by
> handlingTime),handlingTime) as elapsedTime* from tupled3DsTableVw")
>
> *Will this obey the watermark strategy of the original Datastream? (see
> further below)*
> I have also tried to use the Table Api with a session window like :
> Table t = tupled3DsTable
> .window(Session.withGap(lit(2).minutes()).on($("handlingTime")).as("w"
> )).groupBy($("transactionId"), $("w"))
> .select($("handlingTime"), $("transactionId"), $("originalEvent"), $(
> "handlingTime").max().over($("w")));
> This gives:
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Could not resolve over call.
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
>
> and also :
> Table t = tupled3DsTable
> .window(Over.partitionby($("transactionId")).orderBy($(
> "handlingTime")).as("w")).select($("handlingTime"), $("transactionId"), $(
> "originalEvent"), $("handlingTime").lag().as("previousHandlingTime"));
> Which does not work since it cannot find lag function :-(
>
> In java I have the following setup:
> WatermarkStrategy<Tuple3<Long, String, String>> wmstrategy =
> WatermarkStrategy
> .<Tuple3<Long, String,
> String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
> .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
> .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<Long,
> String, String>>() {
> @Override
> public long extractTimestamp(Tuple3<Long, String, String>
> element, long handlingTime) {
> return element.f0;
> }});
>
> DataStream<Tuple3<Long, String, String>> tuple3dswm = tuple3ds.assignTimestampsAndWatermarks(wmstrategy);
>
> Table tupled3DsTable = tableEnv.fromDataStream(tuple3ds, Schema.newBuilder().column("f0","TIMESTAMP_LTZ(3)").column("f1","STRING").column("f2","STRING").watermark("f0", "SOURCE_WATERMARK()")
> .build()).as("handlingTime", "transactionId", "originalEvent");
>
>
>
>
>
>