You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "guenterh.lists" <gu...@bluewin.ch> on 2021/12/06 10:47:43 UTC

use of Scala versions >= 2.13 in Flink 1.15

Dear list,

there have been some discussions and activities in the last months about 
a Scala free runtime which should make it possible to use newer Scala 
version (>= 2.13 / 3.x) on the application side.

Stephan Ewen announced the implementation is on the way [1] and Martijn 
Vissr mentioned in the ask me anything session on version 1.14 that it 
is planned to make this possible in the upcoming 1.15 version (~ next 
February ) [2]

This would be very nice for our currently started project where we are 
discussing the used tools and infrastructure. "Personally" I would 
prefer that people with less experience on the JVM could make their 
start and first experiences with a "pythonized" Scala using the last 
versions of the language (2.13.x or maybe 3.x).

My question: Do you think your plans to provide the possibility of a 
Scala free runtime with the upcoming version is still realistic?

Out of curiosity: If you can make this possible and applications with 
current Scala versions are going to use the Java APIs of Flink what's 
the future of the current Scala API of Flink where you have to decide to 
use either Scala 2.11 or <2.12.8?
Is this then still possible as an alternative?

Thanks for some hints for our planning and decisions

Günter




[1] https://twitter.com/data_fly/status/1415012793347149830
[2] https://www.youtube.com/watch?v=wODmlow0ip0

-- 
Günter Hipler
university library Leipzig


Re: use of Scala versions >= 2.13 in Flink 1.15

Posted by Chesnay Schepler <ch...@apache.org>.
Indeed, if you use a scala-free Flink then Scala types would currently 
go through Kryo, hence why we will recommend to use Java types /for the 
time being/.
We are aware that this is an annoying limitation, and it is certainly 
not a state we want to at in the long-term.
There are some ideas floating around to have both Scala/Java types go 
through the same type extraction / serialization stack, but I don't 
think there is anything concrete to share yet.

As for supporting 2.13, and the corresponding migration from 2.12, I'm 
not aware of a concrete plan at this time.
We do want to support 2.13/3.0 at some point, but the migration is a 
tricky thing, hence why we put off upgrading Scala beyond 2.12.7 for so 
long.
At the moment we are primarily concerned with making such upgrades 
easier in the future by isolating individual Scala-reliant components 
from the Scala APIs.

If you have ideas in this direction that you'd like to share, then I'd 
suggest to head on over to 
https://issues.apache.org/jira/browse/FLINK-13414 and present them there.
At a glance your plan sounds pretty good, but I'm also not too deeply 
involved in the serializer stack ;)

On 07/12/2021 14:51, Roman Grebennikov wrote:
> Hi,
>
> I guess using scala 2.13 with scala-free Flink 1.15 assumes that it will always use generic/Kryo serialization, which has a large performance penalty (YMMV, but it happens all the time with us when we accidentaly use flink java apis with scala case classes).
>
> As far as I know, Flink's set of scala serializers for collections is using some 2.11/2.12 specific deprecated internal things like CanBuildFrom, which are not available on 2.13. So implementing a state migration from 2.12 to 2.13 is not that easy due to a way flink TraversableSerializer is implemented. And createTypeInformation scala macro flink is using for deriving serializers for scala case classes is not directly compatible with 3.0, as there is a completely new scala macro API on 3.x.
>
> Chesnay, I'm wondering what is the plan on 2.13/3.0 support in the future?
>
> If I was the one writing a FLIP for this process, I can imagine it like this:
> * as 2.11 is finally removed in 1.15, the createTypeInformation macro can be re-done on top of magnolia, which supports 2.12, 2.13 and 3.x with the same API.
> * current impementation of flink's serializers for scala collections (afaik in TraversableSerializer) is serializing the whole CanBuildFrom code for a specific concrete collection type right in the snapshot. So it cannot be deserialized on 2.13, as there is no CanBuildFrom. But my own opinion is that the cases when someone has custom CanBuildFrom for their own hand-made scala collection implementation is extremely rare, so with a set of heuristics we can guess the concrete collection type right from the serialized CanBuildFrom scala code, assuming that there is finite number of collection types (around 10 or something).
>
> With this approach we can: support 2.12/2.13/3.x with the same codebase, and allow state migrations between scala versions.
>
> I did some sort of prototype for step 1 (and partially step 2) inhttps://github.com/findify/flink-adt  , although with a different goal of supporting scala ADTs, so if anyone interested, I can make a draft FLIP proposal based on this research to start the discussion.
>
> with best regards,
> Roman Grebennikov |grv@dfdx.me
>
> On Tue, Dec 7, 2021, at 08:46, Chesnay Schepler wrote:
>> We haven't changed anything significant in 1.14.
>>
>> Whether the 2.13 job can run on Scala 2.12 depends a bit on the job (and
>> of course, used libraries!); it depends on the backwards-compatibility
>> from Scala, which APIs are used and what kind of Scala magic is being
>> employed.
>> We haven't really tested that scenario in 1.14 or below.
>>
>> On 07/12/2021 09:28, guenterh.lists wrote:
>>> Hi Chesnay,
>>>
>>> thanks for the info - this is really good news for us.
>>>
>>> I set up a playground using the snapshot from yesterday [1] and a
>>> really quick and short Job using Scala 2.13 [2]
>>>
>>> The job starts and returns correct results. Even the use of a case
>>> class against the Java API is possible.
>>>
>>> Then I made a second try with the same job (compiled with Scala
>>> 2.13.6) running on a Flink 1.14 cluster which was again successful.
>>>
>>> My question:
>>> Is this compilation with Scala versions >=2.13 already part of 1.14 or
>>> is my example too small and simple that binary incompatibilities
>>> between the versions doesn't matter?
>>>
>>> Günter
>>>
>>>
>>> [1]
>>> https://gitlab.com/guenterh/flink_1_15_scala_2_13/-/tree/main/flink-1.15-SNAPSHOT
>>> [2]
>>> https://gitlab.com/guenterh/flink_1_15_scala_2_13/-/blob/main/flink_scala_213/build.sbt#L12
>>> https://gitlab.com/guenterh/flink_1_15_scala_2_13/-/blob/main/flink_scala_213/src/main/scala/de/ub/unileipzig/Job.scala#L8  
>>>
>>>
>>>
>>> On 06.12.21 13:59, Chesnay Schepler wrote:
>>>> With regards to the Java APIs, you will definitely be able to use the
>>>> Java DataSet/DataStream APIs from Scala without any restrictions
>>>> imposed by Flink. This is already working with the current SNAPSHOT
>>>> version.
>>>>
>>>> As we speak we are also working to achieve the same for the Table
>>>> API; we expect to achieve that but with some caveats (i.e., if you
>>>> use the Python API or the Hive connector then you still need to use
>>>> the Scala version provided by Flink).
>>>>
>>>> As for the Scala APIs, we haven't really decided yet how this will
>>>> work in the future. However, one of the big benefits of the
>>>> Scala-free runtime is that it should now be easier for us to release
>>>> the APIs for more Scala versions.
>>>>
>>>> On 06/12/2021 11:47, guenterh.lists wrote:
>>>>> Dear list,
>>>>>
>>>>> there have been some discussions and activities in the last months
>>>>> about a Scala free runtime which should make it possible to use
>>>>> newer Scala version (>= 2.13 / 3.x) on the application side.
>>>>>
>>>>> Stephan Ewen announced the implementation is on the way [1] and
>>>>> Martijn Vissr mentioned in the ask me anything session on version
>>>>> 1.14 that it is planned to make this possible in the upcoming 1.15
>>>>> version (~ next February ) [2]
>>>>>
>>>>> This would be very nice for our currently started project where we
>>>>> are discussing the used tools and infrastructure. "Personally" I
>>>>> would prefer that people with less experience on the JVM could make
>>>>> their start and first experiences with a "pythonized" Scala using
>>>>> the last versions of the language (2.13.x or maybe 3.x).
>>>>>
>>>>> My question: Do you think your plans to provide the possibility of a
>>>>> Scala free runtime with the upcoming version is still realistic?
>>>>>
>>>>> Out of curiosity: If you can make this possible and applications
>>>>> with current Scala versions are going to use the Java APIs of Flink
>>>>> what's the future of the current Scala API of Flink where you have
>>>>> to decide to use either Scala 2.11 or <2.12.8?
>>>>> Is this then still possible as an alternative?
>>>>>
>>>>> Thanks for some hints for our planning and decisions
>>>>>
>>>>> Günter
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> [1]https://twitter.com/data_fly/status/1415012793347149830
>>>>> [2]https://www.youtube.com/watch?v=wODmlow0ip0
>>>>>

Re: use of Scala versions >= 2.13 in Flink 1.15

Posted by Roman Grebennikov <gr...@dfdx.me>.
Hi,

I guess using scala 2.13 with scala-free Flink 1.15 assumes that it will always use generic/Kryo serialization, which has a large performance penalty (YMMV, but it happens all the time with us when we accidentaly use flink java apis with scala case classes).

As far as I know, Flink's set of scala serializers for collections is using some 2.11/2.12 specific deprecated internal things like CanBuildFrom, which are not available on 2.13. So implementing a state migration from 2.12 to 2.13 is not that easy due to a way flink TraversableSerializer is implemented. And createTypeInformation scala macro flink is using for deriving serializers for scala case classes is not directly compatible with 3.0, as there is a completely new scala macro API on 3.x.

Chesnay, I'm wondering what is the plan on 2.13/3.0 support in the future?

If I was the one writing a FLIP for this process, I can imagine it like this:
* as 2.11 is finally removed in 1.15, the createTypeInformation macro can be re-done on top of magnolia, which supports 2.12, 2.13 and 3.x with the same API.
* current impementation of flink's serializers for scala collections (afaik in TraversableSerializer) is serializing the whole CanBuildFrom code for a specific concrete collection type right in the snapshot. So it cannot be deserialized on 2.13, as there is no CanBuildFrom. But my own opinion is that the cases when someone has custom CanBuildFrom for their own hand-made scala collection implementation is extremely rare, so with a set of heuristics we can guess the concrete collection type right from the serialized CanBuildFrom scala code, assuming that there is finite number of collection types (around 10 or something).

With this approach we can: support 2.12/2.13/3.x with the same codebase, and allow state migrations between scala versions.

I did some sort of prototype for step 1 (and partially step 2) in https://github.com/findify/flink-adt , although with a different goal of supporting scala ADTs, so if anyone interested, I can make a draft FLIP proposal based on this research to start the discussion.

with best regards,
Roman Grebennikov | grv@dfdx.me

On Tue, Dec 7, 2021, at 08:46, Chesnay Schepler wrote:
> We haven't changed anything significant in 1.14.
>
> Whether the 2.13 job can run on Scala 2.12 depends a bit on the job (and 
> of course, used libraries!); it depends on the backwards-compatibility 
> from Scala, which APIs are used and what kind of Scala magic is being 
> employed.
> We haven't really tested that scenario in 1.14 or below.
>
> On 07/12/2021 09:28, guenterh.lists wrote:
>> Hi Chesnay,
>>
>> thanks for the info - this is really good news for us.
>>
>> I set up a playground using the snapshot from yesterday [1] and a 
>> really quick and short Job using Scala 2.13 [2]
>>
>> The job starts and returns correct results. Even the use of a case 
>> class against the Java API is possible.
>>
>> Then I made a second try with the same job (compiled with Scala 
>> 2.13.6) running on a Flink 1.14 cluster which was again successful.
>>
>> My question:
>> Is this compilation with Scala versions >=2.13 already part of 1.14 or 
>> is my example too small and simple that binary incompatibilities 
>> between the versions doesn't matter?
>>
>> Günter
>>
>>
>> [1] 
>> https://gitlab.com/guenterh/flink_1_15_scala_2_13/-/tree/main/flink-1.15-SNAPSHOT
>> [2] 
>> https://gitlab.com/guenterh/flink_1_15_scala_2_13/-/blob/main/flink_scala_213/build.sbt#L12
>> https://gitlab.com/guenterh/flink_1_15_scala_2_13/-/blob/main/flink_scala_213/src/main/scala/de/ub/unileipzig/Job.scala#L8 
>>
>>
>>
>> On 06.12.21 13:59, Chesnay Schepler wrote:
>>> With regards to the Java APIs, you will definitely be able to use the 
>>> Java DataSet/DataStream APIs from Scala without any restrictions 
>>> imposed by Flink. This is already working with the current SNAPSHOT 
>>> version.
>>>
>>> As we speak we are also working to achieve the same for the Table 
>>> API; we expect to achieve that but with some caveats (i.e., if you 
>>> use the Python API or the Hive connector then you still need to use 
>>> the Scala version provided by Flink).
>>>
>>> As for the Scala APIs, we haven't really decided yet how this will 
>>> work in the future. However, one of the big benefits of the 
>>> Scala-free runtime is that it should now be easier for us to release 
>>> the APIs for more Scala versions.
>>>
>>> On 06/12/2021 11:47, guenterh.lists wrote:
>>>> Dear list,
>>>>
>>>> there have been some discussions and activities in the last months 
>>>> about a Scala free runtime which should make it possible to use 
>>>> newer Scala version (>= 2.13 / 3.x) on the application side.
>>>>
>>>> Stephan Ewen announced the implementation is on the way [1] and 
>>>> Martijn Vissr mentioned in the ask me anything session on version 
>>>> 1.14 that it is planned to make this possible in the upcoming 1.15 
>>>> version (~ next February ) [2]
>>>>
>>>> This would be very nice for our currently started project where we 
>>>> are discussing the used tools and infrastructure. "Personally" I 
>>>> would prefer that people with less experience on the JVM could make 
>>>> their start and first experiences with a "pythonized" Scala using 
>>>> the last versions of the language (2.13.x or maybe 3.x).
>>>>
>>>> My question: Do you think your plans to provide the possibility of a 
>>>> Scala free runtime with the upcoming version is still realistic?
>>>>
>>>> Out of curiosity: If you can make this possible and applications 
>>>> with current Scala versions are going to use the Java APIs of Flink 
>>>> what's the future of the current Scala API of Flink where you have 
>>>> to decide to use either Scala 2.11 or <2.12.8?
>>>> Is this then still possible as an alternative?
>>>>
>>>> Thanks for some hints for our planning and decisions
>>>>
>>>> Günter
>>>>
>>>>
>>>>
>>>>
>>>> [1] https://twitter.com/data_fly/status/1415012793347149830
>>>> [2] https://www.youtube.com/watch?v=wODmlow0ip0
>>>>
>>>

Re: use of Scala versions >= 2.13 in Flink 1.15

Posted by Chesnay Schepler <ch...@apache.org>.
We haven't changed anything significant in 1.14.

Whether the 2.13 job can run on Scala 2.12 depends a bit on the job (and 
of course, used libraries!); it depends on the backwards-compatibility 
from Scala, which APIs are used and what kind of Scala magic is being 
employed.
We haven't really tested that scenario in 1.14 or below.

On 07/12/2021 09:28, guenterh.lists wrote:
> Hi Chesnay,
>
> thanks for the info - this is really good news for us.
>
> I set up a playground using the snapshot from yesterday [1] and a 
> really quick and short Job using Scala 2.13 [2]
>
> The job starts and returns correct results. Even the use of a case 
> class against the Java API is possible.
>
> Then I made a second try with the same job (compiled with Scala 
> 2.13.6) running on a Flink 1.14 cluster which was again successful.
>
> My question:
> Is this compilation with Scala versions >=2.13 already part of 1.14 or 
> is my example too small and simple that binary incompatibilities 
> between the versions doesn't matter?
>
> Günter
>
>
> [1] 
> https://gitlab.com/guenterh/flink_1_15_scala_2_13/-/tree/main/flink-1.15-SNAPSHOT
> [2] 
> https://gitlab.com/guenterh/flink_1_15_scala_2_13/-/blob/main/flink_scala_213/build.sbt#L12
> https://gitlab.com/guenterh/flink_1_15_scala_2_13/-/blob/main/flink_scala_213/src/main/scala/de/ub/unileipzig/Job.scala#L8 
>
>
>
> On 06.12.21 13:59, Chesnay Schepler wrote:
>> With regards to the Java APIs, you will definitely be able to use the 
>> Java DataSet/DataStream APIs from Scala without any restrictions 
>> imposed by Flink. This is already working with the current SNAPSHOT 
>> version.
>>
>> As we speak we are also working to achieve the same for the Table 
>> API; we expect to achieve that but with some caveats (i.e., if you 
>> use the Python API or the Hive connector then you still need to use 
>> the Scala version provided by Flink).
>>
>> As for the Scala APIs, we haven't really decided yet how this will 
>> work in the future. However, one of the big benefits of the 
>> Scala-free runtime is that it should now be easier for us to release 
>> the APIs for more Scala versions.
>>
>> On 06/12/2021 11:47, guenterh.lists wrote:
>>> Dear list,
>>>
>>> there have been some discussions and activities in the last months 
>>> about a Scala free runtime which should make it possible to use 
>>> newer Scala version (>= 2.13 / 3.x) on the application side.
>>>
>>> Stephan Ewen announced the implementation is on the way [1] and 
>>> Martijn Vissr mentioned in the ask me anything session on version 
>>> 1.14 that it is planned to make this possible in the upcoming 1.15 
>>> version (~ next February ) [2]
>>>
>>> This would be very nice for our currently started project where we 
>>> are discussing the used tools and infrastructure. "Personally" I 
>>> would prefer that people with less experience on the JVM could make 
>>> their start and first experiences with a "pythonized" Scala using 
>>> the last versions of the language (2.13.x or maybe 3.x).
>>>
>>> My question: Do you think your plans to provide the possibility of a 
>>> Scala free runtime with the upcoming version is still realistic?
>>>
>>> Out of curiosity: If you can make this possible and applications 
>>> with current Scala versions are going to use the Java APIs of Flink 
>>> what's the future of the current Scala API of Flink where you have 
>>> to decide to use either Scala 2.11 or <2.12.8?
>>> Is this then still possible as an alternative?
>>>
>>> Thanks for some hints for our planning and decisions
>>>
>>> Günter
>>>
>>>
>>>
>>>
>>> [1] https://twitter.com/data_fly/status/1415012793347149830
>>> [2] https://www.youtube.com/watch?v=wODmlow0ip0
>>>
>>


Re: use of Scala versions >= 2.13 in Flink 1.15

Posted by "guenterh.lists" <gu...@bluewin.ch>.
Hi Chesnay,

thanks for the info - this is really good news for us.

I set up a playground using the snapshot from yesterday [1] and a really 
quick and short Job using Scala 2.13 [2]

The job starts and returns correct results. Even the use of a case class 
against the Java API is possible.

Then I made a second try with the same job (compiled with Scala 2.13.6) 
running on a Flink 1.14 cluster which was again successful.

My question:
Is this compilation with Scala versions >=2.13 already part of 1.14 or 
is my example too small and simple that binary incompatibilities between 
the versions doesn't matter?

Günter


[1] 
https://gitlab.com/guenterh/flink_1_15_scala_2_13/-/tree/main/flink-1.15-SNAPSHOT
[2] 
https://gitlab.com/guenterh/flink_1_15_scala_2_13/-/blob/main/flink_scala_213/build.sbt#L12
https://gitlab.com/guenterh/flink_1_15_scala_2_13/-/blob/main/flink_scala_213/src/main/scala/de/ub/unileipzig/Job.scala#L8


On 06.12.21 13:59, Chesnay Schepler wrote:
> With regards to the Java APIs, you will definitely be able to use the 
> Java DataSet/DataStream APIs from Scala without any restrictions 
> imposed by Flink. This is already working with the current SNAPSHOT 
> version.
>
> As we speak we are also working to achieve the same for the Table API; 
> we expect to achieve that but with some caveats (i.e., if you use the 
> Python API or the Hive connector then you still need to use the Scala 
> version provided by Flink).
>
> As for the Scala APIs, we haven't really decided yet how this will 
> work in the future. However, one of the big benefits of the Scala-free 
> runtime is that it should now be easier for us to release the APIs for 
> more Scala versions.
>
> On 06/12/2021 11:47, guenterh.lists wrote:
>> Dear list,
>>
>> there have been some discussions and activities in the last months 
>> about a Scala free runtime which should make it possible to use newer 
>> Scala version (>= 2.13 / 3.x) on the application side.
>>
>> Stephan Ewen announced the implementation is on the way [1] and 
>> Martijn Vissr mentioned in the ask me anything session on version 
>> 1.14 that it is planned to make this possible in the upcoming 1.15 
>> version (~ next February ) [2]
>>
>> This would be very nice for our currently started project where we 
>> are discussing the used tools and infrastructure. "Personally" I 
>> would prefer that people with less experience on the JVM could make 
>> their start and first experiences with a "pythonized" Scala using the 
>> last versions of the language (2.13.x or maybe 3.x).
>>
>> My question: Do you think your plans to provide the possibility of a 
>> Scala free runtime with the upcoming version is still realistic?
>>
>> Out of curiosity: If you can make this possible and applications with 
>> current Scala versions are going to use the Java APIs of Flink what's 
>> the future of the current Scala API of Flink where you have to decide 
>> to use either Scala 2.11 or <2.12.8?
>> Is this then still possible as an alternative?
>>
>> Thanks for some hints for our planning and decisions
>>
>> Günter
>>
>>
>>
>>
>> [1] https://twitter.com/data_fly/status/1415012793347149830
>> [2] https://www.youtube.com/watch?v=wODmlow0ip0
>>
>
-- 
Günter Hipler
University library Leipzig


Re: use of Scala versions >= 2.13 in Flink 1.15

Posted by Chesnay Schepler <ch...@apache.org>.
With regards to the Java APIs, you will definitely be able to use the 
Java DataSet/DataStream APIs from Scala without any restrictions imposed 
by Flink. This is already working with the current SNAPSHOT version.

As we speak we are also working to achieve the same for the Table API; 
we expect to achieve that but with some caveats (i.e., if you use the 
Python API or the Hive connector then you still need to use the Scala 
version provided by Flink).

As for the Scala APIs, we haven't really decided yet how this will work 
in the future. However, one of the big benefits of the Scala-free 
runtime is that it should now be easier for us to release the APIs for 
more Scala versions.

On 06/12/2021 11:47, guenterh.lists wrote:
> Dear list,
>
> there have been some discussions and activities in the last months 
> about a Scala free runtime which should make it possible to use newer 
> Scala version (>= 2.13 / 3.x) on the application side.
>
> Stephan Ewen announced the implementation is on the way [1] and 
> Martijn Vissr mentioned in the ask me anything session on version 1.14 
> that it is planned to make this possible in the upcoming 1.15 version 
> (~ next February ) [2]
>
> This would be very nice for our currently started project where we are 
> discussing the used tools and infrastructure. "Personally" I would 
> prefer that people with less experience on the JVM could make their 
> start and first experiences with a "pythonized" Scala using the last 
> versions of the language (2.13.x or maybe 3.x).
>
> My question: Do you think your plans to provide the possibility of a 
> Scala free runtime with the upcoming version is still realistic?
>
> Out of curiosity: If you can make this possible and applications with 
> current Scala versions are going to use the Java APIs of Flink what's 
> the future of the current Scala API of Flink where you have to decide 
> to use either Scala 2.11 or <2.12.8?
> Is this then still possible as an alternative?
>
> Thanks for some hints for our planning and decisions
>
> Günter
>
>
>
>
> [1] https://twitter.com/data_fly/status/1415012793347149830
> [2] https://www.youtube.com/watch?v=wODmlow0ip0
>