You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Porritt, James" <Ja...@uk.mlp.com> on 2018/07/17 16:27:22 UTC

Keeping only latest row by key?

In Spark if I want to be able to get a set of unique rows by id, using the criteria of keeping the row with the latest timestamp, I would do the following:

                        .withColumn("rn",
                            F.row_number().over(
                                Window.partitionBy('id') \
                                    .orderBy(F.col('timestamp').desc())
                            )
                        ) \
                        .where(F.col("rn") == 1)

I see Flink has windowing functionality, but I don't see it has row enumeration? How best in that case would I achieve the above?

Thanks,
James.
######################################################################

The information contained in this communication is confidential and

intended only for the individual(s) named above. If you are not a named

addressee, please notify the sender immediately and delete this email

from your system and do not disclose the email or any part of it to any

person. The views expressed in this email are the views of the author

and do not necessarily represent the views of Millennium Capital Partners

LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic

communications of MCP LLP and its affiliates, including telephone

communications, may be electronically archived and subject to review

and/or disclosure to someone other than the recipient. MCP LLP is

authorized and regulated by the Financial Conduct Authority. Millennium

Capital Partners LLP is a limited liability partnership registered in

England & Wales with number OC312897 and with its registered office at

50 Berkeley Street, London, W1J 8HD.

######################################################################

Re: Keeping only latest row by key?

Posted by Fabian Hueske <fh...@gmail.com>.
HI James,

Yes, that should also do the trick.

Best, Fabian

2018-07-19 16:06 GMT+02:00 Porritt, James <Ja...@uk.mlp.com>:

> It looks like the following gives me the result I’m interested in:
>
>
>
> batchEnv
>
>                 .createInput(dataset)
>
>                 .groupBy("id")
>
>                 .sortGroup("timestamp", Order.DESCENDING)
>
>                 .first(1);
>
>
>
> Is there anything I’ve misunderstood with this?
>
>
>
> *From:* Porritt, James <Ja...@uk.mlp.com>
> *Sent:* 19 July 2018 09:21
> *To:* 'Timo Walther' <tw...@apache.org>
> *Cc:* user@flink.apache.org
> *Subject:* RE: Keeping only latest row by key?
>
>
>
> Hi Timo,
>
>                 Thanks for this. I’ve been looking into creating this in
> Java by looking at MaxAggFunction.scala as a basis. Is it correct that I’d
> be creating a version for each type I want to use it with (albeit using
> Generic s) and registering the functions separately for use with the
> correct type of table field?
>
>
>
> Thanks,
>
> James.
>
>
>
> *From:* Timo Walther <tw...@apache.org>
> *Sent:* 18 July 2018 12:21
> *To:* Porritt, James <Ja...@uk.mlp.com>
> *Cc:* user@flink.apache.org
> *Subject:* Re: Keeping only latest row by key?
>
>
>
> Hi James,
>
> the easiest solution for this bahavior is to use a user-defined LAST_VALUE
> aggregate function as discussed here [1].
>
> I hope this helps.
>
> Regards,
> Timo
>
> [1] http://apache-flink-mailing-list-archive.1008284.n3.
> nabble.com/Using-SQL-with-dynamic-tables-where-rows-are-
> updated-td20519.html
>
>
> Am 18.07.18 um 12:54 schrieb Andrey Zagrebin:
>
> Hi James,
>
>
>
> There are over windows in Flink Table API:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/
> tableApi.html#over-windows
>
> It should be possible to implement this behaviour using them.
>
>
>
> Cheers,
>
> Andrey
>
>
>
> On 17 Jul 2018, at 18:27, Porritt, James <Ja...@uk.mlp.com> wrote:
>
>
>
> In Spark if I want to be able to get a set of unique rows by id, using the
> criteria of keeping the row with the latest timestamp, I would do the
> following:
>
>
>
>                         .withColumn("rn",
>
>                             F.row_number().over(
>
>                                 Window.partitionBy(‘id’) \
>
>                                     .orderBy(F.col('timestamp').desc())
>
>                             )
>
>                         ) \
>
>                         .where(F.col("rn") == 1)
>
>
>
> I see Flink has windowing functionality, but I don’t see it has row
> enumeration? How best in that case would I achieve the above?
>
>
>
> Thanks,
>
> James.
>
> ######################################################################
>
> The information contained in this communication is confidential and
>
> intended only for the individual(s) named above. If you are not a named
>
> addressee, please notify the sender immediately and delete this email
>
> from your system and do not disclose the email or any part of it to any
>
> person. The views expressed in this email are the views of the author
>
> and do not necessarily represent the views of Millennium Capital Partners
>
> LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic
>
> communications of MCP LLP and its affiliates, including telephone
>
> communications, may be electronically archived and subject to review
>
> and/or disclosure to someone other than the recipient. MCP LLP is
>
> authorized and regulated by the Financial Conduct Authority. Millennium
>
> Capital Partners LLP is a limited liability partnership registered in
>
> England & Wales with number OC312897 and with its registered office at
>
> 50 Berkeley Street, London, W1J 8HD
> <https://maps.google.com/?q=50+Berkeley+Street,+London,+W1J+8HD&entry=gmail&source=g>.
>
>
> ######################################################################
>
>
>
>
>
> ######################################################################
>
> The information contained in this communication is confidential and
>
> intended only for the individual(s) named above. If you are not a named
>
> addressee, please notify the sender immediately and delete this email
>
> from your system and do not disclose the email or any part of it to any
>
> person. The views expressed in this email are the views of the author
>
> and do not necessarily represent the views of Millennium Capital Partners
>
> LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic
>
> communications of MCP LLP and its affiliates, including telephone
>
> communications, may be electronically archived and subject to review
>
> and/or disclosure to someone other than the recipient. MCP LLP is
>
> authorized and regulated by the Financial Conduct Authority. Millennium
>
> Capital Partners LLP is a limited liability partnership registered in
>
> England & Wales with number OC312897 and with its registered office at
>
> 50 Berkeley Street, London, W1J 8HD
> <https://maps.google.com/?q=50+Berkeley+Street,+London,+W1J+8HD&entry=gmail&source=g>
> .
>
> ######################################################################
>
>
> ######################################################################
> The information contained in this communication is confidential and
> intended only for the individual(s) named above. If you are not a named
> addressee, please notify the sender immediately and delete this email
> from your system and do not disclose the email or any part of it to any
> person. The views expressed in this email are the views of the author
> and do not necessarily represent the views of Millennium Capital Partners
> LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic
> communications of MCP LLP and its affiliates, including telephone
> communications, may be electronically archived and subject to review
> and/or disclosure to someone other than the recipient. MCP LLP is
> authorized and regulated by the Financial Conduct Authority. Millennium
> Capital Partners LLP is a limited liability partnership registered in
> England & Wales with number OC312897 and with its registered office at
> 50 Berkeley Street, London, W1J 8HD
> <https://maps.google.com/?q=50+Berkeley+Street,+London,+W1J+8HD&entry=gmail&source=g>
> .
> ######################################################################
>
>

RE: Keeping only latest row by key?

Posted by "Porritt, James" <Ja...@uk.mlp.com>.
It looks like the following gives me the result I’m interested in:

batchEnv
                .createInput(dataset)
                .groupBy("id")
                .sortGroup("timestamp", Order.DESCENDING)
                .first(1);

Is there anything I’ve misunderstood with this?

From: Porritt, James <Ja...@uk.mlp.com>
Sent: 19 July 2018 09:21
To: 'Timo Walther' <tw...@apache.org>
Cc: user@flink.apache.org
Subject: RE: Keeping only latest row by key?

Hi Timo,
                Thanks for this. I’ve been looking into creating this in Java by looking at MaxAggFunction.scala as a basis. Is it correct that I’d be creating a version for each type I want to use it with (albeit using Generic s) and registering the functions separately for use with the correct type of table field?

Thanks,
James.

From: Timo Walther <tw...@apache.org>>
Sent: 18 July 2018 12:21
To: Porritt, James <Ja...@uk.mlp.com>>
Cc: user@flink.apache.org<ma...@flink.apache.org>
Subject: Re: Keeping only latest row by key?

Hi James,

the easiest solution for this bahavior is to use a user-defined LAST_VALUE aggregate function as discussed here [1].

I hope this helps.

Regards,
Timo

[1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Using-SQL-with-dynamic-tables-where-rows-are-updated-td20519.html


Am 18.07.18 um 12:54 schrieb Andrey Zagrebin:
Hi James,

There are over windows in Flink Table API:
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/tableApi.html#over-windows
It should be possible to implement this behaviour using them.

Cheers,
Andrey

On 17 Jul 2018, at 18:27, Porritt, James <Ja...@uk.mlp.com>> wrote:

In Spark if I want to be able to get a set of unique rows by id, using the criteria of keeping the row with the latest timestamp, I would do the following:

                        .withColumn("rn",
                            F.row_number().over(
                                Window.partitionBy(‘id’) \
                                    .orderBy(F.col('timestamp').desc())
                            )
                        ) \
                        .where(F.col("rn") == 1)

I see Flink has windowing functionality, but I don’t see it has row enumeration? How best in that case would I achieve the above?

Thanks,
James.
######################################################################
The information contained in this communication is confidential and
intended only for the individual(s) named above. If you are not a named
addressee, please notify the sender immediately and delete this email
from your system and do not disclose the email or any part of it to any
person. The views expressed in this email are the views of the author
and do not necessarily represent the views of Millennium Capital Partners
LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic
communications of MCP LLP and its affiliates, including telephone
communications, may be electronically archived and subject to review
and/or disclosure to someone other than the recipient. MCP LLP is
authorized and regulated by the Financial Conduct Authority. Millennium
Capital Partners LLP is a limited liability partnership registered in
England & Wales with number OC312897 and with its registered office at
50 Berkeley Street, London, W1J 8HD.
######################################################################



######################################################################
The information contained in this communication is confidential and
intended only for the individual(s) named above. If you are not a named
addressee, please notify the sender immediately and delete this email
from your system and do not disclose the email or any part of it to any
person. The views expressed in this email are the views of the author
and do not necessarily represent the views of Millennium Capital Partners
LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic
communications of MCP LLP and its affiliates, including telephone
communications, may be electronically archived and subject to review
and/or disclosure to someone other than the recipient. MCP LLP is
authorized and regulated by the Financial Conduct Authority. Millennium
Capital Partners LLP is a limited liability partnership registered in
England & Wales with number OC312897 and with its registered office at
50 Berkeley Street, London, W1J 8HD.
######################################################################

######################################################################

The information contained in this communication is confidential and

intended only for the individual(s) named above. If you are not a named

addressee, please notify the sender immediately and delete this email

from your system and do not disclose the email or any part of it to any

person. The views expressed in this email are the views of the author

and do not necessarily represent the views of Millennium Capital Partners

LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic

communications of MCP LLP and its affiliates, including telephone

communications, may be electronically archived and subject to review

and/or disclosure to someone other than the recipient. MCP LLP is

authorized and regulated by the Financial Conduct Authority. Millennium

Capital Partners LLP is a limited liability partnership registered in

England & Wales with number OC312897 and with its registered office at

50 Berkeley Street, London, W1J 8HD.

######################################################################

RE: Keeping only latest row by key?

Posted by "Porritt, James" <Ja...@uk.mlp.com>.
Hi Timo,
                Thanks for this. I’ve been looking into creating this in Java by looking at MaxAggFunction.scala as a basis. Is it correct that I’d be creating a version for each type I want to use it with (albeit using Generic s) and registering the functions separately for use with the correct type of table field?

Thanks,
James.

From: Timo Walther <tw...@apache.org>
Sent: 18 July 2018 12:21
To: Porritt, James <Ja...@uk.mlp.com>
Cc: user@flink.apache.org
Subject: Re: Keeping only latest row by key?

Hi James,

the easiest solution for this bahavior is to use a user-defined LAST_VALUE aggregate function as discussed here [1].

I hope this helps.

Regards,
Timo

[1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Using-SQL-with-dynamic-tables-where-rows-are-updated-td20519.html


Am 18.07.18 um 12:54 schrieb Andrey Zagrebin:
Hi James,

There are over windows in Flink Table API:
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/tableApi.html#over-windows
It should be possible to implement this behaviour using them.

Cheers,
Andrey


On 17 Jul 2018, at 18:27, Porritt, James <Ja...@uk.mlp.com>> wrote:

In Spark if I want to be able to get a set of unique rows by id, using the criteria of keeping the row with the latest timestamp, I would do the following:

                        .withColumn("rn",
                            F.row_number().over(
                                Window.partitionBy(‘id’) \
                                    .orderBy(F.col('timestamp').desc())
                            )
                        ) \
                        .where(F.col("rn") == 1)

I see Flink has windowing functionality, but I don’t see it has row enumeration? How best in that case would I achieve the above?

Thanks,
James.
######################################################################
The information contained in this communication is confidential and
intended only for the individual(s) named above. If you are not a named
addressee, please notify the sender immediately and delete this email
from your system and do not disclose the email or any part of it to any
person. The views expressed in this email are the views of the author
and do not necessarily represent the views of Millennium Capital Partners
LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic
communications of MCP LLP and its affiliates, including telephone
communications, may be electronically archived and subject to review
and/or disclosure to someone other than the recipient. MCP LLP is
authorized and regulated by the Financial Conduct Authority. Millennium
Capital Partners LLP is a limited liability partnership registered in
England & Wales with number OC312897 and with its registered office at
50 Berkeley Street, London, W1J 8HD.
######################################################################



######################################################################

The information contained in this communication is confidential and

intended only for the individual(s) named above. If you are not a named

addressee, please notify the sender immediately and delete this email

from your system and do not disclose the email or any part of it to any

person. The views expressed in this email are the views of the author

and do not necessarily represent the views of Millennium Capital Partners

LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic

communications of MCP LLP and its affiliates, including telephone

communications, may be electronically archived and subject to review

and/or disclosure to someone other than the recipient. MCP LLP is

authorized and regulated by the Financial Conduct Authority. Millennium

Capital Partners LLP is a limited liability partnership registered in

England & Wales with number OC312897 and with its registered office at

50 Berkeley Street, London, W1J 8HD.

######################################################################

Re: Keeping only latest row by key?

Posted by Timo Walther <tw...@apache.org>.
Hi James,

the easiest solution for this bahavior is to use a user-defined 
LAST_VALUE aggregate function as discussed here [1].

I hope this helps.

Regards,
Timo

[1] 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Using-SQL-with-dynamic-tables-where-rows-are-updated-td20519.html


Am 18.07.18 um 12:54 schrieb Andrey Zagrebin:
> Hi James,
>
> There are over windows in Flink Table API:
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/tableApi.html#over-windows
> It should be possible to implement this behaviour using them.
>
> Cheers,
> Andrey
>
>> On 17 Jul 2018, at 18:27, Porritt, James <James.Porritt@uk.mlp.com 
>> <ma...@uk.mlp.com>> wrote:
>>
>> In Spark if I want to be able to get a set of unique rows by id, 
>> using the criteria of keeping the row with the latest timestamp, I 
>> would do the following:
>> .withColumn("rn",
>> F.row_number().over(
>> Window.partitionBy(‘id’) \
>> .orderBy(F.col('timestamp').desc())
>> )
>> ) \
>> .where(F.col("rn") == 1)
>> I see Flink has windowing functionality, but I don’t see it has row 
>> enumeration? How best in that case would I achieve the above?
>> Thanks,
>> James.
>> ######################################################################
>> The information contained in this communication is confidential and
>> intended only for the individual(s) named above. If you are not a named
>> addressee, please notify the sender immediately and delete this email
>> from your system and do not disclose the email or any part of it to any
>> person. The views expressed in this email are the views of the author
>> and do not necessarily represent the views of Millennium Capital 
>> Partners
>> LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic
>> communications of MCP LLP and its affiliates, including telephone
>> communications, may be electronically archived and subject to review
>> and/or disclosure to someone other than the recipient. MCP LLP is
>> authorized and regulated by the Financial Conduct Authority. Millennium
>> Capital Partners LLP is a limited liability partnership registered in
>> England & Wales with number OC312897 and with its registered office at
>> 50 Berkeley Street, London, W1J 8HD.
>> ######################################################################
>


Re: Keeping only latest row by key?

Posted by Andrey Zagrebin <an...@data-artisans.com>.
Hi James,

There are over windows in Flink Table API:
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/tableApi.html#over-windows <https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/tableApi.html#over-windows>
It should be possible to implement this behaviour using them.

Cheers,
Andrey

> On 17 Jul 2018, at 18:27, Porritt, James <Ja...@uk.mlp.com> wrote:
> 
> In Spark if I want to be able to get a set of unique rows by id, using the criteria of keeping the row with the latest timestamp, I would do the following:
>  
>                         .withColumn("rn",
>                             F.row_number().over(
>                                 Window.partitionBy(‘id’) \
>                                     .orderBy(F.col('timestamp').desc())
>                             )
>                         ) \
>                         .where(F.col("rn") == 1)
>  
> I see Flink has windowing functionality, but I don’t see it has row enumeration? How best in that case would I achieve the above?
>  
> Thanks,
> James.
> ######################################################################
> The information contained in this communication is confidential and
> intended only for the individual(s) named above. If you are not a named
> addressee, please notify the sender immediately and delete this email
> from your system and do not disclose the email or any part of it to any
> person. The views expressed in this email are the views of the author
> and do not necessarily represent the views of Millennium Capital Partners
> LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic
> communications of MCP LLP and its affiliates, including telephone
> communications, may be electronically archived and subject to review
> and/or disclosure to someone other than the recipient. MCP LLP is
> authorized and regulated by the Financial Conduct Authority. Millennium
> Capital Partners LLP is a limited liability partnership registered in
> England & Wales with number OC312897 and with its registered office at
> 50 Berkeley Street, London, W1J 8HD.
> ######################################################################