You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Xingbo Huang <hx...@gmail.com> on 2021/12/29 03:56:51 UTC

[DISCUSS] FLIP-206: Support PyFlink Runtime Execution in Thread Mode

Hi everyone,

I would like to start a discussion thread on "Support PyFlink Runtime
Execution in Thread Mode"

We have provided PyFlink Runtime framework to support Python user-defined
functions since Flink 1.10. The PyFlink Runtime framework is called Process
Mode, which depends on an inter-process communication architecture based on
the Apache Beam Portability framework. Although starting a dedicated
process to execute Python user-defined functions could have better resource
isolation, it will bring greater resource and performance overhead.

In order to overcome the resource and performance problems on Process Mode,
we will propose a new execution mode which executes Python user-defined
functions in the same thread instead of a separate process.

I have drafted the FLIP-206[1]. Please feel free to reply to this email
thread. Looking forward to your feedback!

Best,
Xingbo

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-206%3A+Support+PyFlink+Runtime+Execution+in+Thread+Mode

Re: [DISCUSS] FLIP-206: Support PyFlink Runtime Execution in Thread Mode

Posted by Xingbo Huang <hx...@gmail.com>.
Hi Thomas,

Thanks for the confirmation. I will now start a vote.

Best,
Xingbo

Thomas Weise <th...@apache.org> 于2022年1月12日周三 02:20写道:

> Hi Xingbo,
>
> +1 from my side
>
> Thanks for the clarification. For your use case the parameter size and
> therefore serialization overhead was the limiting factor. I have seen
> use cases where that is not the concern, because the Python logic
> itself is heavy and dwarfs the protocol overhead (for example when
> interacting with external systems from the UDF). Hence it is good to
> give users options to optimize for their application requirements.
>
> Cheers,
> Thomas
>
> On Tue, Jan 11, 2022 at 3:44 AM Xingbo Huang <hx...@gmail.com> wrote:
> >
> > Hi everyone,
> >
> > Thanks to all of you for the discussion.
> > If there are no objections, I would like to start a vote thread tomorrow.
> >
> > Best,
> > Xingbo
> >
> > Xingbo Huang <hx...@gmail.com> 于2022年1月7日周五 16:18写道:
> >
> > > Hi Till,
> > >
> > > I have written a more complicated PyFlink job. Compared with the
> previous
> > > single python udf job, there is an extra stage of converting between
> table
> > > and datastream. Besides, I added a python map function for the job.
> Because
> > > python datastream has not yet implemented Thread mode, the python map
> > > function operator is still running in Process Mode.
> > >
> > > ```
> > > source = t_env.from_path("source_table")  # schema [id: String, d:int]
> > >
> > > @udf(result_type=DataTypes.STRING(), func_type="general")
> > > def upper(x):
> > >     return x.upper()
> > >
> > > t_env.create_temporary_system_function("upper", upper)
> > > # python map function
> > > ds = t_env.to_data_stream(source) \
> > >                 .map(lambda x: x, output_type=Types.ROW_NAMED(["id",
> "d"],
> > >
> > >                            [Types.STRING(),
> > >
> > >                             Types.INT()]))
> > >
> > > t = t_env.from_data_stream(ds)
> > > t.select('upper(id)').execute_insert('sink_table')
> > > ```
> > >
> > > The input data size is 1k.
> > >
> > > Mode                                                 |   QPS
> > > Process Mode                                   |    3w
> > > Thread Mode + Process mode         |    4w
> > >
> > > From the table, we can find that the nodes run in Process Mode is the
> > > performance bottleneck of the job.
> > >
> > > Best,
> > > Xingbo
> > >
> > > Till Rohrmann <tr...@apache.org> 于2022年1月5日周三 23:16写道:
> > >
> > >> Thanks for the detailed answer Xingbo. Quick question on the last
> figure
> > >> in
> > >> the FLIP. You said that this is a real world Flink stream SQL job. The
> > >> title of the graph says UDF(String Upper). So do I understand
> correctly
> > >> that string upper is the real world use case you have measured? What I
> > >> wanted to ask is how a slightly more complex Flink Python job
> (involving
> > >> shuffles, with back pressure, etc.) performs using the thread and
> process
> > >> mode respectively.
> > >>
> > >> If the mode solely needs changes in the Python part of Flink, then I
> don't
> > >> have any concerns from the runtime perspective.
> > >>
> > >> Cheers,
> > >> Till
> > >>
> > >> On Wed, Jan 5, 2022 at 1:55 PM Xingbo Huang <hx...@gmail.com>
> wrote:
> > >>
> > >> > Hi Till and Thomas,
> > >> >
> > >> > Thanks a lot for joining the discussion.
> > >> >
> > >> > For Till:
> > >> >
> > >> > >>> Is the slower performance currently the biggest pain point for
> our
> > >> > Python users? What else are our Python users mainly complaining
> about?
> > >> >
> > >> > PyFlink users are most concerned about two parts, one is better
> > >> usability,
> > >> > the other is performance. Users often make some benchmarks when they
> > >> > investigate pyflink[1][2] at the beginning to decide whether to use
> > >> > PyFlink. The performance of a PyFlink job depends on two parts, one
> is
> > >> the
> > >> > overhead of the PyFlink framework, and the other is the Python
> function
> > >> > complexity implemented by the user. In the Python ecosystem, there
> are
> > >> many
> > >> > libraries and tools that can help Python users improve the
> performance
> > >> of
> > >> > their custom functions, such as pandas[3], numba[4] and cython[5].
> So we
> > >> > hope that the framework overhead of PyFlink itself can also be
> reduced.
> > >> >
> > >> > >>> Concerning the proposed changes, are there any changes required
> on
> > >> the
> > >> > runtime side (changes to Flink)? How will the deployment and memory
> > >> > management be affected when using the thread execution mode?
> > >> >
> > >> > The changes on PyFlink Runtime mentioned here are actually only
> > >> > modifications of PyFlink custom Operators, such as
> > >> > PythonScalarFunctionOperator[6], which won't affect deployment and
> > >> memory
> > >> > management.
> > >> >
> > >> > >>> One more question that came to my mind: How much performance
> > >> > improvement dowe gain on a real-world Python use case? Were the
> > >> > measurements more like micro benchmarks where the Python UDF was
> called
> > >> w/o
> > >> > the overhead of Flink? I would just be curious how much the Python
> > >> > component contributes to the overall runtime of a real world job.
> Do we
> > >> > have some data on this?
> > >> >
> > >> > The last figure I put in FLIP is the performance comparison of three
> > >> real
> > >> > Flink Stream Sql Jobs. They are a Java UDF job, a Python UDF job in
> > >> Process
> > >> > Mode, and a Python UDF job in Thread Mode. The calculated value of
> QPS
> > >> is
> > >> > the end-to-end Flink job execution result. As shown in the
> performance
> > >> > comparison chart, the performance of Python udf with the same
> function
> > >> can
> > >> > often only reach 20% of Java udf, so the performance of python udf
> will
> > >> > often become the performance bottleneck in a PyFlink job.
> > >> >
> > >> > For Thomas:
> > >> >
> > >> > The first time that I realized the framework overhead of various IPC
> > >> > (socket, grpc, shared memory) cannot be ignored in some scenarios is
> > >> due to
> > >> > an image algorithm prediction job of PyFlink. Its input parameters
> are a
> > >> > series of huge image binary arrays, and its data size is bigger
> than 1G.
> > >> > The performance overhead of serialization/deserialization has
> become an
> > >> > important part of its poor performance. Although this job is a bit
> > >> extreme,
> > >> > through measurement, we did find the impact of the
> > >> > serialization/deserialization overhead caused by larger size
> parameters
> > >> on
> > >> > the performance of the IPC framework.
> > >> >
> > >> > >>> As I understand it, you measured the difference in throughput
> for
> > >> UPPER
> > >> > between process and embedded mode and the difference is 50%
> increased
> > >> > throughput?
> > >> >
> > >> > This 50% is the result when the data size is less than 100byte.
> When the
> > >> > data size reaches 1k, the performance of the Embedded Mode will
> reach
> > >> about
> > >> > 3.5 times the performance of the Process Mode shown in the FLIP.
> When
> > >> the
> > >> > data reaches 1M, the performance of Embedded Mode can reach 5 times
> the
> > >> > performance of the Process Mode. The biggest difference here is
> that in
> > >> > Embedded Mode, input/result data does not need to be
> > >> > serialized/deserialized.
> > >> >
> > >> > >>> Is that a typical UDF in your usage?
> > >> >
> > >> > The reason for choosing UPPER is that a simpler udf implementation
> can
> > >> make
> > >> > it easier to evaluate the performance of different execution modes.
> > >> >
> > >> > >>> What do you observe when the function becomes more complex?
> > >> >
> > >> > We can analyze the QPS of the framework (process mode or embedded
> mode)
> > >> and
> > >> > the QPS of the UDF calculation logic separately. A more complex UDF
> > >> means
> > >> > that it is a UDF with a smaller QPS. The main factors that affect
> the
> > >> > framework QPS are data type of parameters, number of parameters and
> > >> size of
> > >> > parameters, which will greatly affect the
> serialization/deserialization
> > >> > overhead in Process Mode.
> > >> >
> > >> > The purpose of introducing thread mode is not to replace Process
> mode,
> > >> but
> > >> > to supplement Python udf usage scenarios such as cep and join, and
> some
> > >> > scenarios where higher performance is pursued. Compared with Thread
> > >> mode,
> > >> > Process Mode has better isolation, which can solve the limitation of
> > >> thread
> > >> > mode in some scenarios such as session mode.
> > >> >
> > >> > [1]
> https://www.mail-archive.com/user@flink.apache.org/msg42760.html
> > >> > [2]
> https://www.mail-archive.com/user@flink.apache.org/msg44975.html
> > >> > [3] https://pandas.pydata.org/
> > >> > [4] https://cython.org/
> > >> > [5] https://numba.pydata.org/
> > >> > [6]
> > >> >
> > >> >
> > >>
> https://github.com/apache/flink/blob/master/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperator.java
> > >> >
> > >> > Best,
> > >> > Xingbo
> > >> >
> > >> > Thomas Weise <th...@apache.org> 于2022年1月4日周二 04:23写道:
> > >> >
> > >> > > Interesting discussion. It caught my attention because I was also
> > >> > > interested in the Beam fn execution overhead a few years ago.
> > >> > >
> > >> > > We found back then that while in theory the fn protocol overhead
> is
> > >> > > very significant, for realistic function workloads that overhead
> was
> > >> > > negligible. And of course it all depends on the use case. It
> might be
> > >> > > worthwhile to quantify a couple more scenarios.
> > >> > >
> > >> > > As I understand it, you measured the difference in throughput for
> > >> > > UPPER between process and embedded mode and the difference is 50%
> > >> > > increased throughput? Is that a typical UDF in your usage? What
> do you
> > >> > > observe when the function becomes more complex?
> > >> > >
> > >> > > Thanks,
> > >> > > Thomas
> > >> > >
> > >> > > On Mon, Jan 3, 2022 at 5:52 AM Till Rohrmann <
> trohrmann@apache.org>
> > >> > wrote:
> > >> > > >
> > >> > > > One more question that came to my mind: How much performance
> > >> > improvement
> > >> > > do
> > >> > > > we gain on a real-world Python use case? Were the measurements
> more
> > >> > like
> > >> > > > micro benchmarks where the Python UDF was called w/o the
> overhead of
> > >> > > Flink?
> > >> > > > I would just be curious how much the Python component
> contributes to
> > >> > the
> > >> > > > overall runtime of a real world job. Do we have some data on
> this?
> > >> > > >
> > >> > > > Cheers,
> > >> > > > Till
> > >> > > >
> > >> > > > On Mon, Jan 3, 2022 at 11:45 AM Till Rohrmann <
> trohrmann@apache.org
> > >> >
> > >> > > wrote:
> > >> > > >
> > >> > > > > Hi Xingbo,
> > >> > > > >
> > >> > > > > Thanks for creating this FLIP. I have two general questions
> about
> > >> the
> > >> > > > > motivation for this FLIP because I have only very little
> exposure
> > >> to
> > >> > > our
> > >> > > > > Python users:
> > >> > > > >
> > >> > > > > Is the slower performance currently the biggest pain point
> for our
> > >> > > Python
> > >> > > > > users?
> > >> > > > >
> > >> > > > > What else are our Python users mainly complaining about?
> > >> > > > >
> > >> > > > > Concerning the proposed changes, are there any changes
> required on
> > >> > the
> > >> > > > > runtime side (changes to Flink)? How will the deployment and
> > >> memory
> > >> > > > > management be affected when using the thread execution mode?
> > >> > > > >
> > >> > > > > Cheers,
> > >> > > > > Till
> > >> > > > >
> > >> > > > > On Fri, Dec 31, 2021 at 9:46 AM Xingbo Huang <
> hxbks2ks@gmail.com>
> > >> > > wrote:
> > >> > > > >
> > >> > > > >> Hi Wei,
> > >> > > > >>
> > >> > > > >> Thanks a lot for your feedback. Very good questions!
> > >> > > > >>
> > >> > > > >> >>> 1. It seems that we dynamically load an embedded Python
> and
> > >> user
> > >> > > > >> dependencies in the TM process. Can they be uninstalled
> cleanly
> > >> > after
> > >> > > the
> > >> > > > >> task finished? i.e. Can we use the Thread Mode in session
> mode
> > >> and
> > >> > > Pyflink
> > >> > > > >> shell?
> > >> > > > >>
> > >> > > > >> I mentioned the limitation of this part in FLIP. There is no
> > >> problem
> > >> > > > >> without changing the python interpreter, but if you need to
> > >> change
> > >> > the
> > >> > > > >> python interpreter, there is really no way to reload the
> Python
> > >> > > library.
> > >> > > > >> The problem is mainly caused by many Python libraries having
> an
> > >> > > assumption
> > >> > > > >> that they own the process alone.
> > >> > > > >>
> > >> > > > >> >>> 2. Does one TM have only one embedded Python running at
> the
> > >> same
> > >> > > time?
> > >> > > > >> If all the Python operator in the TM share the same PVM, will
> > >> there
> > >> > > be a
> > >> > > > >> loss in performance?
> > >> > > > >>
> > >> > > > >> Your understanding is correct that one TM have only one
> embedded
> > >> > > Python
> > >> > > > >> running at the same time. I guess you are worried about the
> > >> > > performance
> > >> > > > >> loss of multi threads caused by Python GIL. There is a
> one-to-one
> > >> > > > >> correspondence between Java worker thread and Python
> > >> > subinterpreters.
> > >> > > > >> Although the subinterpreters has not yet completely overcome
> the
> > >> GIL
> > >> > > > >> sharing problem(The Python community’s recent plan for a
> > >> > > per-interpreter
> > >> > > > >> GIL is also under discussion[1]), the performance of
> > >> subinterpreters
> > >> > > is
> > >> > > > >> very close to that of multiprocessing [2].
> > >> > > > >>
> > >> > > > >> >>> 3. How do we load the relevant c library if the
> > >> > python.executable
> > >> > > is
> > >> > > > >> provided by users?
> > >> > > > >>
> > >> > > > >> Once python.executable is provided, PEMJA will dynamically
> load
> > >> the
> > >> > > > >> CPython
> > >> > > > >> library (libpython.*so or libpython.*dylib) and pemja.so
> > >> installed
> > >> > in
> > >> > > the
> > >> > > > >> python environment.
> > >> > > > >>
> > >> > > > >> >>> May there be a risk of version conflicts?
> > >> > > > >>
> > >> > > > >> I understand that this question is actually discussing
> whether
> > >> C/C++
> > >> > > has a
> > >> > > > >> way to solve the problem of relying on different versions of
> a
> > >> > > library.
> > >> > > > >> First of all, we know that if there is only static linking,
> there
> > >> > > will be
> > >> > > > >> no such problem.  And I have studied the source code of
> > >> CPython[3],
> > >> > > and
> > >> > > > >> there is no usage of dynamic linking. The rest is the case
> where
> > >> > > dynamic
> > >> > > > >> linking is used in the C library written by the users. There
> are
> > >> > many
> > >> > > ways
> > >> > > > >> to solve this problem with dynamic linking, but after all,
> this
> > >> > > library is
> > >> > > > >> written by users, and it is difficult for us to guarantee
> that
> > >> there
> > >> > > will
> > >> > > > >> be no conflicts. At this time, Process Mode will be the
> choice of
> > >> > falk
> > >> > > > >> back.
> > >> > > > >>
> > >> > > > >> [1]
> > >> > > > >>
> > >> > > > >>
> > >> > >
> > >> >
> > >>
> https://mail.python.org/archives/list/python-dev@python.org/thread/S5GZZCEREZLA2PEMTVFBCDM52H4JSENR/#RIK75U3ROEHWZL4VENQSQECB4F4GDELV
> > >> > > > >> [2]
> > >> > > > >>
> > >> > > > >>
> > >> > >
> > >> >
> > >>
> https://mail.python.org/archives/list/python-dev@python.org/thread/PNLBJBNIQDMG2YYGPBCTGOKOAVXRBJWY/#L5OXHXPFONRKLR3W6U46LUSUIBN4FCZQ
> > >> > > > >> [3] https://github.com/python/cpython
> > >> > > > >>
> > >> > > > >> Best,
> > >> > > > >> Xingbo
> > >> > > > >>
> > >> > > > >> Wei Zhong <we...@gmail.com> 于2021年12月31日周五 11:49写道:
> > >> > > > >>
> > >> > > > >> > Hi Xingbo,
> > >> > > > >> >
> > >> > > > >> > Thanks for creating this FLIP. Big +1 for it!
> > >> > > > >> >
> > >> > > > >> > I have some question about the Thread Mode:
> > >> > > > >> >
> > >> > > > >> > 1. It seems that we dynamically load an embedded Python and
> > >> user
> > >> > > > >> > dependencies in the TM process. Can they be uninstalled
> cleanly
> > >> > > after
> > >> > > > >> the
> > >> > > > >> > task finished? i.e. Can we use the Thread Mode in session
> mode
> > >> and
> > >> > > > >> Pyflink
> > >> > > > >> > shell?
> > >> > > > >> >
> > >> > > > >> > 2. Does one TM have only one embedded Python running at the
> > >> same
> > >> > > time?
> > >> > > > >> If
> > >> > > > >> > all the Python operator in the TM share the same PVM, will
> > >> there
> > >> > be
> > >> > > a
> > >> > > > >> loss
> > >> > > > >> > in performance?
> > >> > > > >> >
> > >> > > > >> > 3. How do we load the relevant c library if the
> > >> python.executable
> > >> > is
> > >> > > > >> > provided by users? May there be a risk of version
> conflicts?
> > >> > > > >> >
> > >> > > > >> > Best,
> > >> > > > >> > Wei
> > >> > > > >> >
> > >> > > > >> >
> > >> > > > >> > > 2021年12月29日 上午11:56,Xingbo Huang <hx...@gmail.com>
> 写道:
> > >> > > > >> > >
> > >> > > > >> > > Hi everyone,
> > >> > > > >> > >
> > >> > > > >> > > I would like to start a discussion thread on "Support
> PyFlink
> > >> > > Runtime
> > >> > > > >> > > Execution in Thread Mode"
> > >> > > > >> > >
> > >> > > > >> > > We have provided PyFlink Runtime framework to support
> Python
> > >> > > > >> user-defined
> > >> > > > >> > > functions since Flink 1.10. The PyFlink Runtime
> framework is
> > >> > > called
> > >> > > > >> > Process
> > >> > > > >> > > Mode, which depends on an inter-process communication
> > >> > architecture
> > >> > > > >> based
> > >> > > > >> > on
> > >> > > > >> > > the Apache Beam Portability framework. Although starting
> a
> > >> > > dedicated
> > >> > > > >> > > process to execute Python user-defined functions could
> have
> > >> > better
> > >> > > > >> > resource
> > >> > > > >> > > isolation, it will bring greater resource and performance
> > >> > > overhead.
> > >> > > > >> > >
> > >> > > > >> > > In order to overcome the resource and performance
> problems on
> > >> > > Process
> > >> > > > >> > Mode,
> > >> > > > >> > > we will propose a new execution mode which executes
> Python
> > >> > > > >> user-defined
> > >> > > > >> > > functions in the same thread instead of a separate
> process.
> > >> > > > >> > >
> > >> > > > >> > > I have drafted the FLIP-206[1]. Please feel free to
> reply to
> > >> > this
> > >> > > > >> email
> > >> > > > >> > > thread. Looking forward to your feedback!
> > >> > > > >> > >
> > >> > > > >> > > Best,
> > >> > > > >> > > Xingbo
> > >> > > > >> > >
> > >> > > > >> > > [1]
> > >> > > > >> > >
> > >> > > > >> >
> > >> > > > >>
> > >> > >
> > >> >
> > >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-206%3A+Support+PyFlink+Runtime+Execution+in+Thread+Mode
> > >> > > > >> >
> > >> > > > >> >
> > >> > > > >>
> > >> > > > >
> > >> > >
> > >> >
> > >>
> > >
>

Re: [DISCUSS] FLIP-206: Support PyFlink Runtime Execution in Thread Mode

Posted by Thomas Weise <th...@apache.org>.
Hi Xingbo,

+1 from my side

Thanks for the clarification. For your use case the parameter size and
therefore serialization overhead was the limiting factor. I have seen
use cases where that is not the concern, because the Python logic
itself is heavy and dwarfs the protocol overhead (for example when
interacting with external systems from the UDF). Hence it is good to
give users options to optimize for their application requirements.

Cheers,
Thomas

On Tue, Jan 11, 2022 at 3:44 AM Xingbo Huang <hx...@gmail.com> wrote:
>
> Hi everyone,
>
> Thanks to all of you for the discussion.
> If there are no objections, I would like to start a vote thread tomorrow.
>
> Best,
> Xingbo
>
> Xingbo Huang <hx...@gmail.com> 于2022年1月7日周五 16:18写道:
>
> > Hi Till,
> >
> > I have written a more complicated PyFlink job. Compared with the previous
> > single python udf job, there is an extra stage of converting between table
> > and datastream. Besides, I added a python map function for the job. Because
> > python datastream has not yet implemented Thread mode, the python map
> > function operator is still running in Process Mode.
> >
> > ```
> > source = t_env.from_path("source_table")  # schema [id: String, d:int]
> >
> > @udf(result_type=DataTypes.STRING(), func_type="general")
> > def upper(x):
> >     return x.upper()
> >
> > t_env.create_temporary_system_function("upper", upper)
> > # python map function
> > ds = t_env.to_data_stream(source) \
> >                 .map(lambda x: x, output_type=Types.ROW_NAMED(["id", "d"],
> >
> >                            [Types.STRING(),
> >
> >                             Types.INT()]))
> >
> > t = t_env.from_data_stream(ds)
> > t.select('upper(id)').execute_insert('sink_table')
> > ```
> >
> > The input data size is 1k.
> >
> > Mode                                                 |   QPS
> > Process Mode                                   |    3w
> > Thread Mode + Process mode         |    4w
> >
> > From the table, we can find that the nodes run in Process Mode is the
> > performance bottleneck of the job.
> >
> > Best,
> > Xingbo
> >
> > Till Rohrmann <tr...@apache.org> 于2022年1月5日周三 23:16写道:
> >
> >> Thanks for the detailed answer Xingbo. Quick question on the last figure
> >> in
> >> the FLIP. You said that this is a real world Flink stream SQL job. The
> >> title of the graph says UDF(String Upper). So do I understand correctly
> >> that string upper is the real world use case you have measured? What I
> >> wanted to ask is how a slightly more complex Flink Python job (involving
> >> shuffles, with back pressure, etc.) performs using the thread and process
> >> mode respectively.
> >>
> >> If the mode solely needs changes in the Python part of Flink, then I don't
> >> have any concerns from the runtime perspective.
> >>
> >> Cheers,
> >> Till
> >>
> >> On Wed, Jan 5, 2022 at 1:55 PM Xingbo Huang <hx...@gmail.com> wrote:
> >>
> >> > Hi Till and Thomas,
> >> >
> >> > Thanks a lot for joining the discussion.
> >> >
> >> > For Till:
> >> >
> >> > >>> Is the slower performance currently the biggest pain point for our
> >> > Python users? What else are our Python users mainly complaining about?
> >> >
> >> > PyFlink users are most concerned about two parts, one is better
> >> usability,
> >> > the other is performance. Users often make some benchmarks when they
> >> > investigate pyflink[1][2] at the beginning to decide whether to use
> >> > PyFlink. The performance of a PyFlink job depends on two parts, one is
> >> the
> >> > overhead of the PyFlink framework, and the other is the Python function
> >> > complexity implemented by the user. In the Python ecosystem, there are
> >> many
> >> > libraries and tools that can help Python users improve the performance
> >> of
> >> > their custom functions, such as pandas[3], numba[4] and cython[5]. So we
> >> > hope that the framework overhead of PyFlink itself can also be reduced.
> >> >
> >> > >>> Concerning the proposed changes, are there any changes required on
> >> the
> >> > runtime side (changes to Flink)? How will the deployment and memory
> >> > management be affected when using the thread execution mode?
> >> >
> >> > The changes on PyFlink Runtime mentioned here are actually only
> >> > modifications of PyFlink custom Operators, such as
> >> > PythonScalarFunctionOperator[6], which won't affect deployment and
> >> memory
> >> > management.
> >> >
> >> > >>> One more question that came to my mind: How much performance
> >> > improvement dowe gain on a real-world Python use case? Were the
> >> > measurements more like micro benchmarks where the Python UDF was called
> >> w/o
> >> > the overhead of Flink? I would just be curious how much the Python
> >> > component contributes to the overall runtime of a real world job. Do we
> >> > have some data on this?
> >> >
> >> > The last figure I put in FLIP is the performance comparison of three
> >> real
> >> > Flink Stream Sql Jobs. They are a Java UDF job, a Python UDF job in
> >> Process
> >> > Mode, and a Python UDF job in Thread Mode. The calculated value of QPS
> >> is
> >> > the end-to-end Flink job execution result. As shown in the performance
> >> > comparison chart, the performance of Python udf with the same function
> >> can
> >> > often only reach 20% of Java udf, so the performance of python udf will
> >> > often become the performance bottleneck in a PyFlink job.
> >> >
> >> > For Thomas:
> >> >
> >> > The first time that I realized the framework overhead of various IPC
> >> > (socket, grpc, shared memory) cannot be ignored in some scenarios is
> >> due to
> >> > an image algorithm prediction job of PyFlink. Its input parameters are a
> >> > series of huge image binary arrays, and its data size is bigger than 1G.
> >> > The performance overhead of serialization/deserialization has become an
> >> > important part of its poor performance. Although this job is a bit
> >> extreme,
> >> > through measurement, we did find the impact of the
> >> > serialization/deserialization overhead caused by larger size parameters
> >> on
> >> > the performance of the IPC framework.
> >> >
> >> > >>> As I understand it, you measured the difference in throughput for
> >> UPPER
> >> > between process and embedded mode and the difference is 50% increased
> >> > throughput?
> >> >
> >> > This 50% is the result when the data size is less than 100byte. When the
> >> > data size reaches 1k, the performance of the Embedded Mode will reach
> >> about
> >> > 3.5 times the performance of the Process Mode shown in the FLIP. When
> >> the
> >> > data reaches 1M, the performance of Embedded Mode can reach 5 times the
> >> > performance of the Process Mode. The biggest difference here is that in
> >> > Embedded Mode, input/result data does not need to be
> >> > serialized/deserialized.
> >> >
> >> > >>> Is that a typical UDF in your usage?
> >> >
> >> > The reason for choosing UPPER is that a simpler udf implementation can
> >> make
> >> > it easier to evaluate the performance of different execution modes.
> >> >
> >> > >>> What do you observe when the function becomes more complex?
> >> >
> >> > We can analyze the QPS of the framework (process mode or embedded mode)
> >> and
> >> > the QPS of the UDF calculation logic separately. A more complex UDF
> >> means
> >> > that it is a UDF with a smaller QPS. The main factors that affect the
> >> > framework QPS are data type of parameters, number of parameters and
> >> size of
> >> > parameters, which will greatly affect the serialization/deserialization
> >> > overhead in Process Mode.
> >> >
> >> > The purpose of introducing thread mode is not to replace Process mode,
> >> but
> >> > to supplement Python udf usage scenarios such as cep and join, and some
> >> > scenarios where higher performance is pursued. Compared with Thread
> >> mode,
> >> > Process Mode has better isolation, which can solve the limitation of
> >> thread
> >> > mode in some scenarios such as session mode.
> >> >
> >> > [1] https://www.mail-archive.com/user@flink.apache.org/msg42760.html
> >> > [2] https://www.mail-archive.com/user@flink.apache.org/msg44975.html
> >> > [3] https://pandas.pydata.org/
> >> > [4] https://cython.org/
> >> > [5] https://numba.pydata.org/
> >> > [6]
> >> >
> >> >
> >> https://github.com/apache/flink/blob/master/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperator.java
> >> >
> >> > Best,
> >> > Xingbo
> >> >
> >> > Thomas Weise <th...@apache.org> 于2022年1月4日周二 04:23写道:
> >> >
> >> > > Interesting discussion. It caught my attention because I was also
> >> > > interested in the Beam fn execution overhead a few years ago.
> >> > >
> >> > > We found back then that while in theory the fn protocol overhead is
> >> > > very significant, for realistic function workloads that overhead was
> >> > > negligible. And of course it all depends on the use case. It might be
> >> > > worthwhile to quantify a couple more scenarios.
> >> > >
> >> > > As I understand it, you measured the difference in throughput for
> >> > > UPPER between process and embedded mode and the difference is 50%
> >> > > increased throughput? Is that a typical UDF in your usage? What do you
> >> > > observe when the function becomes more complex?
> >> > >
> >> > > Thanks,
> >> > > Thomas
> >> > >
> >> > > On Mon, Jan 3, 2022 at 5:52 AM Till Rohrmann <tr...@apache.org>
> >> > wrote:
> >> > > >
> >> > > > One more question that came to my mind: How much performance
> >> > improvement
> >> > > do
> >> > > > we gain on a real-world Python use case? Were the measurements more
> >> > like
> >> > > > micro benchmarks where the Python UDF was called w/o the overhead of
> >> > > Flink?
> >> > > > I would just be curious how much the Python component contributes to
> >> > the
> >> > > > overall runtime of a real world job. Do we have some data on this?
> >> > > >
> >> > > > Cheers,
> >> > > > Till
> >> > > >
> >> > > > On Mon, Jan 3, 2022 at 11:45 AM Till Rohrmann <trohrmann@apache.org
> >> >
> >> > > wrote:
> >> > > >
> >> > > > > Hi Xingbo,
> >> > > > >
> >> > > > > Thanks for creating this FLIP. I have two general questions about
> >> the
> >> > > > > motivation for this FLIP because I have only very little exposure
> >> to
> >> > > our
> >> > > > > Python users:
> >> > > > >
> >> > > > > Is the slower performance currently the biggest pain point for our
> >> > > Python
> >> > > > > users?
> >> > > > >
> >> > > > > What else are our Python users mainly complaining about?
> >> > > > >
> >> > > > > Concerning the proposed changes, are there any changes required on
> >> > the
> >> > > > > runtime side (changes to Flink)? How will the deployment and
> >> memory
> >> > > > > management be affected when using the thread execution mode?
> >> > > > >
> >> > > > > Cheers,
> >> > > > > Till
> >> > > > >
> >> > > > > On Fri, Dec 31, 2021 at 9:46 AM Xingbo Huang <hx...@gmail.com>
> >> > > wrote:
> >> > > > >
> >> > > > >> Hi Wei,
> >> > > > >>
> >> > > > >> Thanks a lot for your feedback. Very good questions!
> >> > > > >>
> >> > > > >> >>> 1. It seems that we dynamically load an embedded Python and
> >> user
> >> > > > >> dependencies in the TM process. Can they be uninstalled cleanly
> >> > after
> >> > > the
> >> > > > >> task finished? i.e. Can we use the Thread Mode in session mode
> >> and
> >> > > Pyflink
> >> > > > >> shell?
> >> > > > >>
> >> > > > >> I mentioned the limitation of this part in FLIP. There is no
> >> problem
> >> > > > >> without changing the python interpreter, but if you need to
> >> change
> >> > the
> >> > > > >> python interpreter, there is really no way to reload the Python
> >> > > library.
> >> > > > >> The problem is mainly caused by many Python libraries having an
> >> > > assumption
> >> > > > >> that they own the process alone.
> >> > > > >>
> >> > > > >> >>> 2. Does one TM have only one embedded Python running at the
> >> same
> >> > > time?
> >> > > > >> If all the Python operator in the TM share the same PVM, will
> >> there
> >> > > be a
> >> > > > >> loss in performance?
> >> > > > >>
> >> > > > >> Your understanding is correct that one TM have only one embedded
> >> > > Python
> >> > > > >> running at the same time. I guess you are worried about the
> >> > > performance
> >> > > > >> loss of multi threads caused by Python GIL. There is a one-to-one
> >> > > > >> correspondence between Java worker thread and Python
> >> > subinterpreters.
> >> > > > >> Although the subinterpreters has not yet completely overcome the
> >> GIL
> >> > > > >> sharing problem(The Python community’s recent plan for a
> >> > > per-interpreter
> >> > > > >> GIL is also under discussion[1]), the performance of
> >> subinterpreters
> >> > > is
> >> > > > >> very close to that of multiprocessing [2].
> >> > > > >>
> >> > > > >> >>> 3. How do we load the relevant c library if the
> >> > python.executable
> >> > > is
> >> > > > >> provided by users?
> >> > > > >>
> >> > > > >> Once python.executable is provided, PEMJA will dynamically load
> >> the
> >> > > > >> CPython
> >> > > > >> library (libpython.*so or libpython.*dylib) and pemja.so
> >> installed
> >> > in
> >> > > the
> >> > > > >> python environment.
> >> > > > >>
> >> > > > >> >>> May there be a risk of version conflicts?
> >> > > > >>
> >> > > > >> I understand that this question is actually discussing whether
> >> C/C++
> >> > > has a
> >> > > > >> way to solve the problem of relying on different versions of a
> >> > > library.
> >> > > > >> First of all, we know that if there is only static linking, there
> >> > > will be
> >> > > > >> no such problem.  And I have studied the source code of
> >> CPython[3],
> >> > > and
> >> > > > >> there is no usage of dynamic linking. The rest is the case where
> >> > > dynamic
> >> > > > >> linking is used in the C library written by the users. There are
> >> > many
> >> > > ways
> >> > > > >> to solve this problem with dynamic linking, but after all, this
> >> > > library is
> >> > > > >> written by users, and it is difficult for us to guarantee that
> >> there
> >> > > will
> >> > > > >> be no conflicts. At this time, Process Mode will be the choice of
> >> > falk
> >> > > > >> back.
> >> > > > >>
> >> > > > >> [1]
> >> > > > >>
> >> > > > >>
> >> > >
> >> >
> >> https://mail.python.org/archives/list/python-dev@python.org/thread/S5GZZCEREZLA2PEMTVFBCDM52H4JSENR/#RIK75U3ROEHWZL4VENQSQECB4F4GDELV
> >> > > > >> [2]
> >> > > > >>
> >> > > > >>
> >> > >
> >> >
> >> https://mail.python.org/archives/list/python-dev@python.org/thread/PNLBJBNIQDMG2YYGPBCTGOKOAVXRBJWY/#L5OXHXPFONRKLR3W6U46LUSUIBN4FCZQ
> >> > > > >> [3] https://github.com/python/cpython
> >> > > > >>
> >> > > > >> Best,
> >> > > > >> Xingbo
> >> > > > >>
> >> > > > >> Wei Zhong <we...@gmail.com> 于2021年12月31日周五 11:49写道:
> >> > > > >>
> >> > > > >> > Hi Xingbo,
> >> > > > >> >
> >> > > > >> > Thanks for creating this FLIP. Big +1 for it!
> >> > > > >> >
> >> > > > >> > I have some question about the Thread Mode:
> >> > > > >> >
> >> > > > >> > 1. It seems that we dynamically load an embedded Python and
> >> user
> >> > > > >> > dependencies in the TM process. Can they be uninstalled cleanly
> >> > > after
> >> > > > >> the
> >> > > > >> > task finished? i.e. Can we use the Thread Mode in session mode
> >> and
> >> > > > >> Pyflink
> >> > > > >> > shell?
> >> > > > >> >
> >> > > > >> > 2. Does one TM have only one embedded Python running at the
> >> same
> >> > > time?
> >> > > > >> If
> >> > > > >> > all the Python operator in the TM share the same PVM, will
> >> there
> >> > be
> >> > > a
> >> > > > >> loss
> >> > > > >> > in performance?
> >> > > > >> >
> >> > > > >> > 3. How do we load the relevant c library if the
> >> python.executable
> >> > is
> >> > > > >> > provided by users? May there be a risk of version conflicts?
> >> > > > >> >
> >> > > > >> > Best,
> >> > > > >> > Wei
> >> > > > >> >
> >> > > > >> >
> >> > > > >> > > 2021年12月29日 上午11:56,Xingbo Huang <hx...@gmail.com> 写道:
> >> > > > >> > >
> >> > > > >> > > Hi everyone,
> >> > > > >> > >
> >> > > > >> > > I would like to start a discussion thread on "Support PyFlink
> >> > > Runtime
> >> > > > >> > > Execution in Thread Mode"
> >> > > > >> > >
> >> > > > >> > > We have provided PyFlink Runtime framework to support Python
> >> > > > >> user-defined
> >> > > > >> > > functions since Flink 1.10. The PyFlink Runtime framework is
> >> > > called
> >> > > > >> > Process
> >> > > > >> > > Mode, which depends on an inter-process communication
> >> > architecture
> >> > > > >> based
> >> > > > >> > on
> >> > > > >> > > the Apache Beam Portability framework. Although starting a
> >> > > dedicated
> >> > > > >> > > process to execute Python user-defined functions could have
> >> > better
> >> > > > >> > resource
> >> > > > >> > > isolation, it will bring greater resource and performance
> >> > > overhead.
> >> > > > >> > >
> >> > > > >> > > In order to overcome the resource and performance problems on
> >> > > Process
> >> > > > >> > Mode,
> >> > > > >> > > we will propose a new execution mode which executes Python
> >> > > > >> user-defined
> >> > > > >> > > functions in the same thread instead of a separate process.
> >> > > > >> > >
> >> > > > >> > > I have drafted the FLIP-206[1]. Please feel free to reply to
> >> > this
> >> > > > >> email
> >> > > > >> > > thread. Looking forward to your feedback!
> >> > > > >> > >
> >> > > > >> > > Best,
> >> > > > >> > > Xingbo
> >> > > > >> > >
> >> > > > >> > > [1]
> >> > > > >> > >
> >> > > > >> >
> >> > > > >>
> >> > >
> >> >
> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-206%3A+Support+PyFlink+Runtime+Execution+in+Thread+Mode
> >> > > > >> >
> >> > > > >> >
> >> > > > >>
> >> > > > >
> >> > >
> >> >
> >>
> >

Re: [DISCUSS] FLIP-206: Support PyFlink Runtime Execution in Thread Mode

Posted by Xingbo Huang <hx...@gmail.com>.
Hi everyone,

Thanks to all of you for the discussion.
If there are no objections, I would like to start a vote thread tomorrow.

Best,
Xingbo

Xingbo Huang <hx...@gmail.com> 于2022年1月7日周五 16:18写道:

> Hi Till,
>
> I have written a more complicated PyFlink job. Compared with the previous
> single python udf job, there is an extra stage of converting between table
> and datastream. Besides, I added a python map function for the job. Because
> python datastream has not yet implemented Thread mode, the python map
> function operator is still running in Process Mode.
>
> ```
> source = t_env.from_path("source_table")  # schema [id: String, d:int]
>
> @udf(result_type=DataTypes.STRING(), func_type="general")
> def upper(x):
>     return x.upper()
>
> t_env.create_temporary_system_function("upper", upper)
> # python map function
> ds = t_env.to_data_stream(source) \
>                 .map(lambda x: x, output_type=Types.ROW_NAMED(["id", "d"],
>
>                            [Types.STRING(),
>
>                             Types.INT()]))
>
> t = t_env.from_data_stream(ds)
> t.select('upper(id)').execute_insert('sink_table')
> ```
>
> The input data size is 1k.
>
> Mode                                                 |   QPS
> Process Mode                                   |    3w
> Thread Mode + Process mode         |    4w
>
> From the table, we can find that the nodes run in Process Mode is the
> performance bottleneck of the job.
>
> Best,
> Xingbo
>
> Till Rohrmann <tr...@apache.org> 于2022年1月5日周三 23:16写道:
>
>> Thanks for the detailed answer Xingbo. Quick question on the last figure
>> in
>> the FLIP. You said that this is a real world Flink stream SQL job. The
>> title of the graph says UDF(String Upper). So do I understand correctly
>> that string upper is the real world use case you have measured? What I
>> wanted to ask is how a slightly more complex Flink Python job (involving
>> shuffles, with back pressure, etc.) performs using the thread and process
>> mode respectively.
>>
>> If the mode solely needs changes in the Python part of Flink, then I don't
>> have any concerns from the runtime perspective.
>>
>> Cheers,
>> Till
>>
>> On Wed, Jan 5, 2022 at 1:55 PM Xingbo Huang <hx...@gmail.com> wrote:
>>
>> > Hi Till and Thomas,
>> >
>> > Thanks a lot for joining the discussion.
>> >
>> > For Till:
>> >
>> > >>> Is the slower performance currently the biggest pain point for our
>> > Python users? What else are our Python users mainly complaining about?
>> >
>> > PyFlink users are most concerned about two parts, one is better
>> usability,
>> > the other is performance. Users often make some benchmarks when they
>> > investigate pyflink[1][2] at the beginning to decide whether to use
>> > PyFlink. The performance of a PyFlink job depends on two parts, one is
>> the
>> > overhead of the PyFlink framework, and the other is the Python function
>> > complexity implemented by the user. In the Python ecosystem, there are
>> many
>> > libraries and tools that can help Python users improve the performance
>> of
>> > their custom functions, such as pandas[3], numba[4] and cython[5]. So we
>> > hope that the framework overhead of PyFlink itself can also be reduced.
>> >
>> > >>> Concerning the proposed changes, are there any changes required on
>> the
>> > runtime side (changes to Flink)? How will the deployment and memory
>> > management be affected when using the thread execution mode?
>> >
>> > The changes on PyFlink Runtime mentioned here are actually only
>> > modifications of PyFlink custom Operators, such as
>> > PythonScalarFunctionOperator[6], which won't affect deployment and
>> memory
>> > management.
>> >
>> > >>> One more question that came to my mind: How much performance
>> > improvement dowe gain on a real-world Python use case? Were the
>> > measurements more like micro benchmarks where the Python UDF was called
>> w/o
>> > the overhead of Flink? I would just be curious how much the Python
>> > component contributes to the overall runtime of a real world job. Do we
>> > have some data on this?
>> >
>> > The last figure I put in FLIP is the performance comparison of three
>> real
>> > Flink Stream Sql Jobs. They are a Java UDF job, a Python UDF job in
>> Process
>> > Mode, and a Python UDF job in Thread Mode. The calculated value of QPS
>> is
>> > the end-to-end Flink job execution result. As shown in the performance
>> > comparison chart, the performance of Python udf with the same function
>> can
>> > often only reach 20% of Java udf, so the performance of python udf will
>> > often become the performance bottleneck in a PyFlink job.
>> >
>> > For Thomas:
>> >
>> > The first time that I realized the framework overhead of various IPC
>> > (socket, grpc, shared memory) cannot be ignored in some scenarios is
>> due to
>> > an image algorithm prediction job of PyFlink. Its input parameters are a
>> > series of huge image binary arrays, and its data size is bigger than 1G.
>> > The performance overhead of serialization/deserialization has become an
>> > important part of its poor performance. Although this job is a bit
>> extreme,
>> > through measurement, we did find the impact of the
>> > serialization/deserialization overhead caused by larger size parameters
>> on
>> > the performance of the IPC framework.
>> >
>> > >>> As I understand it, you measured the difference in throughput for
>> UPPER
>> > between process and embedded mode and the difference is 50% increased
>> > throughput?
>> >
>> > This 50% is the result when the data size is less than 100byte. When the
>> > data size reaches 1k, the performance of the Embedded Mode will reach
>> about
>> > 3.5 times the performance of the Process Mode shown in the FLIP. When
>> the
>> > data reaches 1M, the performance of Embedded Mode can reach 5 times the
>> > performance of the Process Mode. The biggest difference here is that in
>> > Embedded Mode, input/result data does not need to be
>> > serialized/deserialized.
>> >
>> > >>> Is that a typical UDF in your usage?
>> >
>> > The reason for choosing UPPER is that a simpler udf implementation can
>> make
>> > it easier to evaluate the performance of different execution modes.
>> >
>> > >>> What do you observe when the function becomes more complex?
>> >
>> > We can analyze the QPS of the framework (process mode or embedded mode)
>> and
>> > the QPS of the UDF calculation logic separately. A more complex UDF
>> means
>> > that it is a UDF with a smaller QPS. The main factors that affect the
>> > framework QPS are data type of parameters, number of parameters and
>> size of
>> > parameters, which will greatly affect the serialization/deserialization
>> > overhead in Process Mode.
>> >
>> > The purpose of introducing thread mode is not to replace Process mode,
>> but
>> > to supplement Python udf usage scenarios such as cep and join, and some
>> > scenarios where higher performance is pursued. Compared with Thread
>> mode,
>> > Process Mode has better isolation, which can solve the limitation of
>> thread
>> > mode in some scenarios such as session mode.
>> >
>> > [1] https://www.mail-archive.com/user@flink.apache.org/msg42760.html
>> > [2] https://www.mail-archive.com/user@flink.apache.org/msg44975.html
>> > [3] https://pandas.pydata.org/
>> > [4] https://cython.org/
>> > [5] https://numba.pydata.org/
>> > [6]
>> >
>> >
>> https://github.com/apache/flink/blob/master/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperator.java
>> >
>> > Best,
>> > Xingbo
>> >
>> > Thomas Weise <th...@apache.org> 于2022年1月4日周二 04:23写道:
>> >
>> > > Interesting discussion. It caught my attention because I was also
>> > > interested in the Beam fn execution overhead a few years ago.
>> > >
>> > > We found back then that while in theory the fn protocol overhead is
>> > > very significant, for realistic function workloads that overhead was
>> > > negligible. And of course it all depends on the use case. It might be
>> > > worthwhile to quantify a couple more scenarios.
>> > >
>> > > As I understand it, you measured the difference in throughput for
>> > > UPPER between process and embedded mode and the difference is 50%
>> > > increased throughput? Is that a typical UDF in your usage? What do you
>> > > observe when the function becomes more complex?
>> > >
>> > > Thanks,
>> > > Thomas
>> > >
>> > > On Mon, Jan 3, 2022 at 5:52 AM Till Rohrmann <tr...@apache.org>
>> > wrote:
>> > > >
>> > > > One more question that came to my mind: How much performance
>> > improvement
>> > > do
>> > > > we gain on a real-world Python use case? Were the measurements more
>> > like
>> > > > micro benchmarks where the Python UDF was called w/o the overhead of
>> > > Flink?
>> > > > I would just be curious how much the Python component contributes to
>> > the
>> > > > overall runtime of a real world job. Do we have some data on this?
>> > > >
>> > > > Cheers,
>> > > > Till
>> > > >
>> > > > On Mon, Jan 3, 2022 at 11:45 AM Till Rohrmann <trohrmann@apache.org
>> >
>> > > wrote:
>> > > >
>> > > > > Hi Xingbo,
>> > > > >
>> > > > > Thanks for creating this FLIP. I have two general questions about
>> the
>> > > > > motivation for this FLIP because I have only very little exposure
>> to
>> > > our
>> > > > > Python users:
>> > > > >
>> > > > > Is the slower performance currently the biggest pain point for our
>> > > Python
>> > > > > users?
>> > > > >
>> > > > > What else are our Python users mainly complaining about?
>> > > > >
>> > > > > Concerning the proposed changes, are there any changes required on
>> > the
>> > > > > runtime side (changes to Flink)? How will the deployment and
>> memory
>> > > > > management be affected when using the thread execution mode?
>> > > > >
>> > > > > Cheers,
>> > > > > Till
>> > > > >
>> > > > > On Fri, Dec 31, 2021 at 9:46 AM Xingbo Huang <hx...@gmail.com>
>> > > wrote:
>> > > > >
>> > > > >> Hi Wei,
>> > > > >>
>> > > > >> Thanks a lot for your feedback. Very good questions!
>> > > > >>
>> > > > >> >>> 1. It seems that we dynamically load an embedded Python and
>> user
>> > > > >> dependencies in the TM process. Can they be uninstalled cleanly
>> > after
>> > > the
>> > > > >> task finished? i.e. Can we use the Thread Mode in session mode
>> and
>> > > Pyflink
>> > > > >> shell?
>> > > > >>
>> > > > >> I mentioned the limitation of this part in FLIP. There is no
>> problem
>> > > > >> without changing the python interpreter, but if you need to
>> change
>> > the
>> > > > >> python interpreter, there is really no way to reload the Python
>> > > library.
>> > > > >> The problem is mainly caused by many Python libraries having an
>> > > assumption
>> > > > >> that they own the process alone.
>> > > > >>
>> > > > >> >>> 2. Does one TM have only one embedded Python running at the
>> same
>> > > time?
>> > > > >> If all the Python operator in the TM share the same PVM, will
>> there
>> > > be a
>> > > > >> loss in performance?
>> > > > >>
>> > > > >> Your understanding is correct that one TM have only one embedded
>> > > Python
>> > > > >> running at the same time. I guess you are worried about the
>> > > performance
>> > > > >> loss of multi threads caused by Python GIL. There is a one-to-one
>> > > > >> correspondence between Java worker thread and Python
>> > subinterpreters.
>> > > > >> Although the subinterpreters has not yet completely overcome the
>> GIL
>> > > > >> sharing problem(The Python community’s recent plan for a
>> > > per-interpreter
>> > > > >> GIL is also under discussion[1]), the performance of
>> subinterpreters
>> > > is
>> > > > >> very close to that of multiprocessing [2].
>> > > > >>
>> > > > >> >>> 3. How do we load the relevant c library if the
>> > python.executable
>> > > is
>> > > > >> provided by users?
>> > > > >>
>> > > > >> Once python.executable is provided, PEMJA will dynamically load
>> the
>> > > > >> CPython
>> > > > >> library (libpython.*so or libpython.*dylib) and pemja.so
>> installed
>> > in
>> > > the
>> > > > >> python environment.
>> > > > >>
>> > > > >> >>> May there be a risk of version conflicts?
>> > > > >>
>> > > > >> I understand that this question is actually discussing whether
>> C/C++
>> > > has a
>> > > > >> way to solve the problem of relying on different versions of a
>> > > library.
>> > > > >> First of all, we know that if there is only static linking, there
>> > > will be
>> > > > >> no such problem.  And I have studied the source code of
>> CPython[3],
>> > > and
>> > > > >> there is no usage of dynamic linking. The rest is the case where
>> > > dynamic
>> > > > >> linking is used in the C library written by the users. There are
>> > many
>> > > ways
>> > > > >> to solve this problem with dynamic linking, but after all, this
>> > > library is
>> > > > >> written by users, and it is difficult for us to guarantee that
>> there
>> > > will
>> > > > >> be no conflicts. At this time, Process Mode will be the choice of
>> > falk
>> > > > >> back.
>> > > > >>
>> > > > >> [1]
>> > > > >>
>> > > > >>
>> > >
>> >
>> https://mail.python.org/archives/list/python-dev@python.org/thread/S5GZZCEREZLA2PEMTVFBCDM52H4JSENR/#RIK75U3ROEHWZL4VENQSQECB4F4GDELV
>> > > > >> [2]
>> > > > >>
>> > > > >>
>> > >
>> >
>> https://mail.python.org/archives/list/python-dev@python.org/thread/PNLBJBNIQDMG2YYGPBCTGOKOAVXRBJWY/#L5OXHXPFONRKLR3W6U46LUSUIBN4FCZQ
>> > > > >> [3] https://github.com/python/cpython
>> > > > >>
>> > > > >> Best,
>> > > > >> Xingbo
>> > > > >>
>> > > > >> Wei Zhong <we...@gmail.com> 于2021年12月31日周五 11:49写道:
>> > > > >>
>> > > > >> > Hi Xingbo,
>> > > > >> >
>> > > > >> > Thanks for creating this FLIP. Big +1 for it!
>> > > > >> >
>> > > > >> > I have some question about the Thread Mode:
>> > > > >> >
>> > > > >> > 1. It seems that we dynamically load an embedded Python and
>> user
>> > > > >> > dependencies in the TM process. Can they be uninstalled cleanly
>> > > after
>> > > > >> the
>> > > > >> > task finished? i.e. Can we use the Thread Mode in session mode
>> and
>> > > > >> Pyflink
>> > > > >> > shell?
>> > > > >> >
>> > > > >> > 2. Does one TM have only one embedded Python running at the
>> same
>> > > time?
>> > > > >> If
>> > > > >> > all the Python operator in the TM share the same PVM, will
>> there
>> > be
>> > > a
>> > > > >> loss
>> > > > >> > in performance?
>> > > > >> >
>> > > > >> > 3. How do we load the relevant c library if the
>> python.executable
>> > is
>> > > > >> > provided by users? May there be a risk of version conflicts?
>> > > > >> >
>> > > > >> > Best,
>> > > > >> > Wei
>> > > > >> >
>> > > > >> >
>> > > > >> > > 2021年12月29日 上午11:56,Xingbo Huang <hx...@gmail.com> 写道:
>> > > > >> > >
>> > > > >> > > Hi everyone,
>> > > > >> > >
>> > > > >> > > I would like to start a discussion thread on "Support PyFlink
>> > > Runtime
>> > > > >> > > Execution in Thread Mode"
>> > > > >> > >
>> > > > >> > > We have provided PyFlink Runtime framework to support Python
>> > > > >> user-defined
>> > > > >> > > functions since Flink 1.10. The PyFlink Runtime framework is
>> > > called
>> > > > >> > Process
>> > > > >> > > Mode, which depends on an inter-process communication
>> > architecture
>> > > > >> based
>> > > > >> > on
>> > > > >> > > the Apache Beam Portability framework. Although starting a
>> > > dedicated
>> > > > >> > > process to execute Python user-defined functions could have
>> > better
>> > > > >> > resource
>> > > > >> > > isolation, it will bring greater resource and performance
>> > > overhead.
>> > > > >> > >
>> > > > >> > > In order to overcome the resource and performance problems on
>> > > Process
>> > > > >> > Mode,
>> > > > >> > > we will propose a new execution mode which executes Python
>> > > > >> user-defined
>> > > > >> > > functions in the same thread instead of a separate process.
>> > > > >> > >
>> > > > >> > > I have drafted the FLIP-206[1]. Please feel free to reply to
>> > this
>> > > > >> email
>> > > > >> > > thread. Looking forward to your feedback!
>> > > > >> > >
>> > > > >> > > Best,
>> > > > >> > > Xingbo
>> > > > >> > >
>> > > > >> > > [1]
>> > > > >> > >
>> > > > >> >
>> > > > >>
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-206%3A+Support+PyFlink+Runtime+Execution+in+Thread+Mode
>> > > > >> >
>> > > > >> >
>> > > > >>
>> > > > >
>> > >
>> >
>>
>

Re: [DISCUSS] FLIP-206: Support PyFlink Runtime Execution in Thread Mode

Posted by Xingbo Huang <hx...@gmail.com>.
Hi Till,

I have written a more complicated PyFlink job. Compared with the previous
single python udf job, there is an extra stage of converting between table
and datastream. Besides, I added a python map function for the job. Because
python datastream has not yet implemented Thread mode, the python map
function operator is still running in Process Mode.

```
source = t_env.from_path("source_table")  # schema [id: String, d:int]

@udf(result_type=DataTypes.STRING(), func_type="general")
def upper(x):
    return x.upper()

t_env.create_temporary_system_function("upper", upper)
# python map function
ds = t_env.to_data_stream(source) \
                .map(lambda x: x, output_type=Types.ROW_NAMED(["id", "d"],

                         [Types.STRING(),

                          Types.INT()]))

t = t_env.from_data_stream(ds)
t.select('upper(id)').execute_insert('sink_table')
```

The input data size is 1k.

Mode                                                 |   QPS
Process Mode                                   |    3w
Thread Mode + Process mode         |    4w

From the table, we can find that the nodes run in Process Mode is the
performance bottleneck of the job.

Best,
Xingbo

Till Rohrmann <tr...@apache.org> 于2022年1月5日周三 23:16写道:

> Thanks for the detailed answer Xingbo. Quick question on the last figure in
> the FLIP. You said that this is a real world Flink stream SQL job. The
> title of the graph says UDF(String Upper). So do I understand correctly
> that string upper is the real world use case you have measured? What I
> wanted to ask is how a slightly more complex Flink Python job (involving
> shuffles, with back pressure, etc.) performs using the thread and process
> mode respectively.
>
> If the mode solely needs changes in the Python part of Flink, then I don't
> have any concerns from the runtime perspective.
>
> Cheers,
> Till
>
> On Wed, Jan 5, 2022 at 1:55 PM Xingbo Huang <hx...@gmail.com> wrote:
>
> > Hi Till and Thomas,
> >
> > Thanks a lot for joining the discussion.
> >
> > For Till:
> >
> > >>> Is the slower performance currently the biggest pain point for our
> > Python users? What else are our Python users mainly complaining about?
> >
> > PyFlink users are most concerned about two parts, one is better
> usability,
> > the other is performance. Users often make some benchmarks when they
> > investigate pyflink[1][2] at the beginning to decide whether to use
> > PyFlink. The performance of a PyFlink job depends on two parts, one is
> the
> > overhead of the PyFlink framework, and the other is the Python function
> > complexity implemented by the user. In the Python ecosystem, there are
> many
> > libraries and tools that can help Python users improve the performance of
> > their custom functions, such as pandas[3], numba[4] and cython[5]. So we
> > hope that the framework overhead of PyFlink itself can also be reduced.
> >
> > >>> Concerning the proposed changes, are there any changes required on
> the
> > runtime side (changes to Flink)? How will the deployment and memory
> > management be affected when using the thread execution mode?
> >
> > The changes on PyFlink Runtime mentioned here are actually only
> > modifications of PyFlink custom Operators, such as
> > PythonScalarFunctionOperator[6], which won't affect deployment and memory
> > management.
> >
> > >>> One more question that came to my mind: How much performance
> > improvement dowe gain on a real-world Python use case? Were the
> > measurements more like micro benchmarks where the Python UDF was called
> w/o
> > the overhead of Flink? I would just be curious how much the Python
> > component contributes to the overall runtime of a real world job. Do we
> > have some data on this?
> >
> > The last figure I put in FLIP is the performance comparison of three real
> > Flink Stream Sql Jobs. They are a Java UDF job, a Python UDF job in
> Process
> > Mode, and a Python UDF job in Thread Mode. The calculated value of QPS is
> > the end-to-end Flink job execution result. As shown in the performance
> > comparison chart, the performance of Python udf with the same function
> can
> > often only reach 20% of Java udf, so the performance of python udf will
> > often become the performance bottleneck in a PyFlink job.
> >
> > For Thomas:
> >
> > The first time that I realized the framework overhead of various IPC
> > (socket, grpc, shared memory) cannot be ignored in some scenarios is due
> to
> > an image algorithm prediction job of PyFlink. Its input parameters are a
> > series of huge image binary arrays, and its data size is bigger than 1G.
> > The performance overhead of serialization/deserialization has become an
> > important part of its poor performance. Although this job is a bit
> extreme,
> > through measurement, we did find the impact of the
> > serialization/deserialization overhead caused by larger size parameters
> on
> > the performance of the IPC framework.
> >
> > >>> As I understand it, you measured the difference in throughput for
> UPPER
> > between process and embedded mode and the difference is 50% increased
> > throughput?
> >
> > This 50% is the result when the data size is less than 100byte. When the
> > data size reaches 1k, the performance of the Embedded Mode will reach
> about
> > 3.5 times the performance of the Process Mode shown in the FLIP. When the
> > data reaches 1M, the performance of Embedded Mode can reach 5 times the
> > performance of the Process Mode. The biggest difference here is that in
> > Embedded Mode, input/result data does not need to be
> > serialized/deserialized.
> >
> > >>> Is that a typical UDF in your usage?
> >
> > The reason for choosing UPPER is that a simpler udf implementation can
> make
> > it easier to evaluate the performance of different execution modes.
> >
> > >>> What do you observe when the function becomes more complex?
> >
> > We can analyze the QPS of the framework (process mode or embedded mode)
> and
> > the QPS of the UDF calculation logic separately. A more complex UDF means
> > that it is a UDF with a smaller QPS. The main factors that affect the
> > framework QPS are data type of parameters, number of parameters and size
> of
> > parameters, which will greatly affect the serialization/deserialization
> > overhead in Process Mode.
> >
> > The purpose of introducing thread mode is not to replace Process mode,
> but
> > to supplement Python udf usage scenarios such as cep and join, and some
> > scenarios where higher performance is pursued. Compared with Thread mode,
> > Process Mode has better isolation, which can solve the limitation of
> thread
> > mode in some scenarios such as session mode.
> >
> > [1] https://www.mail-archive.com/user@flink.apache.org/msg42760.html
> > [2] https://www.mail-archive.com/user@flink.apache.org/msg44975.html
> > [3] https://pandas.pydata.org/
> > [4] https://cython.org/
> > [5] https://numba.pydata.org/
> > [6]
> >
> >
> https://github.com/apache/flink/blob/master/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperator.java
> >
> > Best,
> > Xingbo
> >
> > Thomas Weise <th...@apache.org> 于2022年1月4日周二 04:23写道:
> >
> > > Interesting discussion. It caught my attention because I was also
> > > interested in the Beam fn execution overhead a few years ago.
> > >
> > > We found back then that while in theory the fn protocol overhead is
> > > very significant, for realistic function workloads that overhead was
> > > negligible. And of course it all depends on the use case. It might be
> > > worthwhile to quantify a couple more scenarios.
> > >
> > > As I understand it, you measured the difference in throughput for
> > > UPPER between process and embedded mode and the difference is 50%
> > > increased throughput? Is that a typical UDF in your usage? What do you
> > > observe when the function becomes more complex?
> > >
> > > Thanks,
> > > Thomas
> > >
> > > On Mon, Jan 3, 2022 at 5:52 AM Till Rohrmann <tr...@apache.org>
> > wrote:
> > > >
> > > > One more question that came to my mind: How much performance
> > improvement
> > > do
> > > > we gain on a real-world Python use case? Were the measurements more
> > like
> > > > micro benchmarks where the Python UDF was called w/o the overhead of
> > > Flink?
> > > > I would just be curious how much the Python component contributes to
> > the
> > > > overall runtime of a real world job. Do we have some data on this?
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Mon, Jan 3, 2022 at 11:45 AM Till Rohrmann <tr...@apache.org>
> > > wrote:
> > > >
> > > > > Hi Xingbo,
> > > > >
> > > > > Thanks for creating this FLIP. I have two general questions about
> the
> > > > > motivation for this FLIP because I have only very little exposure
> to
> > > our
> > > > > Python users:
> > > > >
> > > > > Is the slower performance currently the biggest pain point for our
> > > Python
> > > > > users?
> > > > >
> > > > > What else are our Python users mainly complaining about?
> > > > >
> > > > > Concerning the proposed changes, are there any changes required on
> > the
> > > > > runtime side (changes to Flink)? How will the deployment and memory
> > > > > management be affected when using the thread execution mode?
> > > > >
> > > > > Cheers,
> > > > > Till
> > > > >
> > > > > On Fri, Dec 31, 2021 at 9:46 AM Xingbo Huang <hx...@gmail.com>
> > > wrote:
> > > > >
> > > > >> Hi Wei,
> > > > >>
> > > > >> Thanks a lot for your feedback. Very good questions!
> > > > >>
> > > > >> >>> 1. It seems that we dynamically load an embedded Python and
> user
> > > > >> dependencies in the TM process. Can they be uninstalled cleanly
> > after
> > > the
> > > > >> task finished? i.e. Can we use the Thread Mode in session mode and
> > > Pyflink
> > > > >> shell?
> > > > >>
> > > > >> I mentioned the limitation of this part in FLIP. There is no
> problem
> > > > >> without changing the python interpreter, but if you need to change
> > the
> > > > >> python interpreter, there is really no way to reload the Python
> > > library.
> > > > >> The problem is mainly caused by many Python libraries having an
> > > assumption
> > > > >> that they own the process alone.
> > > > >>
> > > > >> >>> 2. Does one TM have only one embedded Python running at the
> same
> > > time?
> > > > >> If all the Python operator in the TM share the same PVM, will
> there
> > > be a
> > > > >> loss in performance?
> > > > >>
> > > > >> Your understanding is correct that one TM have only one embedded
> > > Python
> > > > >> running at the same time. I guess you are worried about the
> > > performance
> > > > >> loss of multi threads caused by Python GIL. There is a one-to-one
> > > > >> correspondence between Java worker thread and Python
> > subinterpreters.
> > > > >> Although the subinterpreters has not yet completely overcome the
> GIL
> > > > >> sharing problem(The Python community’s recent plan for a
> > > per-interpreter
> > > > >> GIL is also under discussion[1]), the performance of
> subinterpreters
> > > is
> > > > >> very close to that of multiprocessing [2].
> > > > >>
> > > > >> >>> 3. How do we load the relevant c library if the
> > python.executable
> > > is
> > > > >> provided by users?
> > > > >>
> > > > >> Once python.executable is provided, PEMJA will dynamically load
> the
> > > > >> CPython
> > > > >> library (libpython.*so or libpython.*dylib) and pemja.so installed
> > in
> > > the
> > > > >> python environment.
> > > > >>
> > > > >> >>> May there be a risk of version conflicts?
> > > > >>
> > > > >> I understand that this question is actually discussing whether
> C/C++
> > > has a
> > > > >> way to solve the problem of relying on different versions of a
> > > library.
> > > > >> First of all, we know that if there is only static linking, there
> > > will be
> > > > >> no such problem.  And I have studied the source code of
> CPython[3],
> > > and
> > > > >> there is no usage of dynamic linking. The rest is the case where
> > > dynamic
> > > > >> linking is used in the C library written by the users. There are
> > many
> > > ways
> > > > >> to solve this problem with dynamic linking, but after all, this
> > > library is
> > > > >> written by users, and it is difficult for us to guarantee that
> there
> > > will
> > > > >> be no conflicts. At this time, Process Mode will be the choice of
> > falk
> > > > >> back.
> > > > >>
> > > > >> [1]
> > > > >>
> > > > >>
> > >
> >
> https://mail.python.org/archives/list/python-dev@python.org/thread/S5GZZCEREZLA2PEMTVFBCDM52H4JSENR/#RIK75U3ROEHWZL4VENQSQECB4F4GDELV
> > > > >> [2]
> > > > >>
> > > > >>
> > >
> >
> https://mail.python.org/archives/list/python-dev@python.org/thread/PNLBJBNIQDMG2YYGPBCTGOKOAVXRBJWY/#L5OXHXPFONRKLR3W6U46LUSUIBN4FCZQ
> > > > >> [3] https://github.com/python/cpython
> > > > >>
> > > > >> Best,
> > > > >> Xingbo
> > > > >>
> > > > >> Wei Zhong <we...@gmail.com> 于2021年12月31日周五 11:49写道:
> > > > >>
> > > > >> > Hi Xingbo,
> > > > >> >
> > > > >> > Thanks for creating this FLIP. Big +1 for it!
> > > > >> >
> > > > >> > I have some question about the Thread Mode:
> > > > >> >
> > > > >> > 1. It seems that we dynamically load an embedded Python and user
> > > > >> > dependencies in the TM process. Can they be uninstalled cleanly
> > > after
> > > > >> the
> > > > >> > task finished? i.e. Can we use the Thread Mode in session mode
> and
> > > > >> Pyflink
> > > > >> > shell?
> > > > >> >
> > > > >> > 2. Does one TM have only one embedded Python running at the same
> > > time?
> > > > >> If
> > > > >> > all the Python operator in the TM share the same PVM, will there
> > be
> > > a
> > > > >> loss
> > > > >> > in performance?
> > > > >> >
> > > > >> > 3. How do we load the relevant c library if the
> python.executable
> > is
> > > > >> > provided by users? May there be a risk of version conflicts?
> > > > >> >
> > > > >> > Best,
> > > > >> > Wei
> > > > >> >
> > > > >> >
> > > > >> > > 2021年12月29日 上午11:56,Xingbo Huang <hx...@gmail.com> 写道:
> > > > >> > >
> > > > >> > > Hi everyone,
> > > > >> > >
> > > > >> > > I would like to start a discussion thread on "Support PyFlink
> > > Runtime
> > > > >> > > Execution in Thread Mode"
> > > > >> > >
> > > > >> > > We have provided PyFlink Runtime framework to support Python
> > > > >> user-defined
> > > > >> > > functions since Flink 1.10. The PyFlink Runtime framework is
> > > called
> > > > >> > Process
> > > > >> > > Mode, which depends on an inter-process communication
> > architecture
> > > > >> based
> > > > >> > on
> > > > >> > > the Apache Beam Portability framework. Although starting a
> > > dedicated
> > > > >> > > process to execute Python user-defined functions could have
> > better
> > > > >> > resource
> > > > >> > > isolation, it will bring greater resource and performance
> > > overhead.
> > > > >> > >
> > > > >> > > In order to overcome the resource and performance problems on
> > > Process
> > > > >> > Mode,
> > > > >> > > we will propose a new execution mode which executes Python
> > > > >> user-defined
> > > > >> > > functions in the same thread instead of a separate process.
> > > > >> > >
> > > > >> > > I have drafted the FLIP-206[1]. Please feel free to reply to
> > this
> > > > >> email
> > > > >> > > thread. Looking forward to your feedback!
> > > > >> > >
> > > > >> > > Best,
> > > > >> > > Xingbo
> > > > >> > >
> > > > >> > > [1]
> > > > >> > >
> > > > >> >
> > > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-206%3A+Support+PyFlink+Runtime+Execution+in+Thread+Mode
> > > > >> >
> > > > >> >
> > > > >>
> > > > >
> > >
> >
>

Re: [DISCUSS] FLIP-206: Support PyFlink Runtime Execution in Thread Mode

Posted by Till Rohrmann <tr...@apache.org>.
Thanks for the detailed answer Xingbo. Quick question on the last figure in
the FLIP. You said that this is a real world Flink stream SQL job. The
title of the graph says UDF(String Upper). So do I understand correctly
that string upper is the real world use case you have measured? What I
wanted to ask is how a slightly more complex Flink Python job (involving
shuffles, with back pressure, etc.) performs using the thread and process
mode respectively.

If the mode solely needs changes in the Python part of Flink, then I don't
have any concerns from the runtime perspective.

Cheers,
Till

On Wed, Jan 5, 2022 at 1:55 PM Xingbo Huang <hx...@gmail.com> wrote:

> Hi Till and Thomas,
>
> Thanks a lot for joining the discussion.
>
> For Till:
>
> >>> Is the slower performance currently the biggest pain point for our
> Python users? What else are our Python users mainly complaining about?
>
> PyFlink users are most concerned about two parts, one is better usability,
> the other is performance. Users often make some benchmarks when they
> investigate pyflink[1][2] at the beginning to decide whether to use
> PyFlink. The performance of a PyFlink job depends on two parts, one is the
> overhead of the PyFlink framework, and the other is the Python function
> complexity implemented by the user. In the Python ecosystem, there are many
> libraries and tools that can help Python users improve the performance of
> their custom functions, such as pandas[3], numba[4] and cython[5]. So we
> hope that the framework overhead of PyFlink itself can also be reduced.
>
> >>> Concerning the proposed changes, are there any changes required on the
> runtime side (changes to Flink)? How will the deployment and memory
> management be affected when using the thread execution mode?
>
> The changes on PyFlink Runtime mentioned here are actually only
> modifications of PyFlink custom Operators, such as
> PythonScalarFunctionOperator[6], which won't affect deployment and memory
> management.
>
> >>> One more question that came to my mind: How much performance
> improvement dowe gain on a real-world Python use case? Were the
> measurements more like micro benchmarks where the Python UDF was called w/o
> the overhead of Flink? I would just be curious how much the Python
> component contributes to the overall runtime of a real world job. Do we
> have some data on this?
>
> The last figure I put in FLIP is the performance comparison of three real
> Flink Stream Sql Jobs. They are a Java UDF job, a Python UDF job in Process
> Mode, and a Python UDF job in Thread Mode. The calculated value of QPS is
> the end-to-end Flink job execution result. As shown in the performance
> comparison chart, the performance of Python udf with the same function can
> often only reach 20% of Java udf, so the performance of python udf will
> often become the performance bottleneck in a PyFlink job.
>
> For Thomas:
>
> The first time that I realized the framework overhead of various IPC
> (socket, grpc, shared memory) cannot be ignored in some scenarios is due to
> an image algorithm prediction job of PyFlink. Its input parameters are a
> series of huge image binary arrays, and its data size is bigger than 1G.
> The performance overhead of serialization/deserialization has become an
> important part of its poor performance. Although this job is a bit extreme,
> through measurement, we did find the impact of the
> serialization/deserialization overhead caused by larger size parameters on
> the performance of the IPC framework.
>
> >>> As I understand it, you measured the difference in throughput for UPPER
> between process and embedded mode and the difference is 50% increased
> throughput?
>
> This 50% is the result when the data size is less than 100byte. When the
> data size reaches 1k, the performance of the Embedded Mode will reach about
> 3.5 times the performance of the Process Mode shown in the FLIP. When the
> data reaches 1M, the performance of Embedded Mode can reach 5 times the
> performance of the Process Mode. The biggest difference here is that in
> Embedded Mode, input/result data does not need to be
> serialized/deserialized.
>
> >>> Is that a typical UDF in your usage?
>
> The reason for choosing UPPER is that a simpler udf implementation can make
> it easier to evaluate the performance of different execution modes.
>
> >>> What do you observe when the function becomes more complex?
>
> We can analyze the QPS of the framework (process mode or embedded mode) and
> the QPS of the UDF calculation logic separately. A more complex UDF means
> that it is a UDF with a smaller QPS. The main factors that affect the
> framework QPS are data type of parameters, number of parameters and size of
> parameters, which will greatly affect the serialization/deserialization
> overhead in Process Mode.
>
> The purpose of introducing thread mode is not to replace Process mode, but
> to supplement Python udf usage scenarios such as cep and join, and some
> scenarios where higher performance is pursued. Compared with Thread mode,
> Process Mode has better isolation, which can solve the limitation of thread
> mode in some scenarios such as session mode.
>
> [1] https://www.mail-archive.com/user@flink.apache.org/msg42760.html
> [2] https://www.mail-archive.com/user@flink.apache.org/msg44975.html
> [3] https://pandas.pydata.org/
> [4] https://cython.org/
> [5] https://numba.pydata.org/
> [6]
>
> https://github.com/apache/flink/blob/master/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperator.java
>
> Best,
> Xingbo
>
> Thomas Weise <th...@apache.org> 于2022年1月4日周二 04:23写道:
>
> > Interesting discussion. It caught my attention because I was also
> > interested in the Beam fn execution overhead a few years ago.
> >
> > We found back then that while in theory the fn protocol overhead is
> > very significant, for realistic function workloads that overhead was
> > negligible. And of course it all depends on the use case. It might be
> > worthwhile to quantify a couple more scenarios.
> >
> > As I understand it, you measured the difference in throughput for
> > UPPER between process and embedded mode and the difference is 50%
> > increased throughput? Is that a typical UDF in your usage? What do you
> > observe when the function becomes more complex?
> >
> > Thanks,
> > Thomas
> >
> > On Mon, Jan 3, 2022 at 5:52 AM Till Rohrmann <tr...@apache.org>
> wrote:
> > >
> > > One more question that came to my mind: How much performance
> improvement
> > do
> > > we gain on a real-world Python use case? Were the measurements more
> like
> > > micro benchmarks where the Python UDF was called w/o the overhead of
> > Flink?
> > > I would just be curious how much the Python component contributes to
> the
> > > overall runtime of a real world job. Do we have some data on this?
> > >
> > > Cheers,
> > > Till
> > >
> > > On Mon, Jan 3, 2022 at 11:45 AM Till Rohrmann <tr...@apache.org>
> > wrote:
> > >
> > > > Hi Xingbo,
> > > >
> > > > Thanks for creating this FLIP. I have two general questions about the
> > > > motivation for this FLIP because I have only very little exposure to
> > our
> > > > Python users:
> > > >
> > > > Is the slower performance currently the biggest pain point for our
> > Python
> > > > users?
> > > >
> > > > What else are our Python users mainly complaining about?
> > > >
> > > > Concerning the proposed changes, are there any changes required on
> the
> > > > runtime side (changes to Flink)? How will the deployment and memory
> > > > management be affected when using the thread execution mode?
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Fri, Dec 31, 2021 at 9:46 AM Xingbo Huang <hx...@gmail.com>
> > wrote:
> > > >
> > > >> Hi Wei,
> > > >>
> > > >> Thanks a lot for your feedback. Very good questions!
> > > >>
> > > >> >>> 1. It seems that we dynamically load an embedded Python and user
> > > >> dependencies in the TM process. Can they be uninstalled cleanly
> after
> > the
> > > >> task finished? i.e. Can we use the Thread Mode in session mode and
> > Pyflink
> > > >> shell?
> > > >>
> > > >> I mentioned the limitation of this part in FLIP. There is no problem
> > > >> without changing the python interpreter, but if you need to change
> the
> > > >> python interpreter, there is really no way to reload the Python
> > library.
> > > >> The problem is mainly caused by many Python libraries having an
> > assumption
> > > >> that they own the process alone.
> > > >>
> > > >> >>> 2. Does one TM have only one embedded Python running at the same
> > time?
> > > >> If all the Python operator in the TM share the same PVM, will there
> > be a
> > > >> loss in performance?
> > > >>
> > > >> Your understanding is correct that one TM have only one embedded
> > Python
> > > >> running at the same time. I guess you are worried about the
> > performance
> > > >> loss of multi threads caused by Python GIL. There is a one-to-one
> > > >> correspondence between Java worker thread and Python
> subinterpreters.
> > > >> Although the subinterpreters has not yet completely overcome the GIL
> > > >> sharing problem(The Python community’s recent plan for a
> > per-interpreter
> > > >> GIL is also under discussion[1]), the performance of subinterpreters
> > is
> > > >> very close to that of multiprocessing [2].
> > > >>
> > > >> >>> 3. How do we load the relevant c library if the
> python.executable
> > is
> > > >> provided by users?
> > > >>
> > > >> Once python.executable is provided, PEMJA will dynamically load the
> > > >> CPython
> > > >> library (libpython.*so or libpython.*dylib) and pemja.so installed
> in
> > the
> > > >> python environment.
> > > >>
> > > >> >>> May there be a risk of version conflicts?
> > > >>
> > > >> I understand that this question is actually discussing whether C/C++
> > has a
> > > >> way to solve the problem of relying on different versions of a
> > library.
> > > >> First of all, we know that if there is only static linking, there
> > will be
> > > >> no such problem.  And I have studied the source code of CPython[3],
> > and
> > > >> there is no usage of dynamic linking. The rest is the case where
> > dynamic
> > > >> linking is used in the C library written by the users. There are
> many
> > ways
> > > >> to solve this problem with dynamic linking, but after all, this
> > library is
> > > >> written by users, and it is difficult for us to guarantee that there
> > will
> > > >> be no conflicts. At this time, Process Mode will be the choice of
> falk
> > > >> back.
> > > >>
> > > >> [1]
> > > >>
> > > >>
> >
> https://mail.python.org/archives/list/python-dev@python.org/thread/S5GZZCEREZLA2PEMTVFBCDM52H4JSENR/#RIK75U3ROEHWZL4VENQSQECB4F4GDELV
> > > >> [2]
> > > >>
> > > >>
> >
> https://mail.python.org/archives/list/python-dev@python.org/thread/PNLBJBNIQDMG2YYGPBCTGOKOAVXRBJWY/#L5OXHXPFONRKLR3W6U46LUSUIBN4FCZQ
> > > >> [3] https://github.com/python/cpython
> > > >>
> > > >> Best,
> > > >> Xingbo
> > > >>
> > > >> Wei Zhong <we...@gmail.com> 于2021年12月31日周五 11:49写道:
> > > >>
> > > >> > Hi Xingbo,
> > > >> >
> > > >> > Thanks for creating this FLIP. Big +1 for it!
> > > >> >
> > > >> > I have some question about the Thread Mode:
> > > >> >
> > > >> > 1. It seems that we dynamically load an embedded Python and user
> > > >> > dependencies in the TM process. Can they be uninstalled cleanly
> > after
> > > >> the
> > > >> > task finished? i.e. Can we use the Thread Mode in session mode and
> > > >> Pyflink
> > > >> > shell?
> > > >> >
> > > >> > 2. Does one TM have only one embedded Python running at the same
> > time?
> > > >> If
> > > >> > all the Python operator in the TM share the same PVM, will there
> be
> > a
> > > >> loss
> > > >> > in performance?
> > > >> >
> > > >> > 3. How do we load the relevant c library if the python.executable
> is
> > > >> > provided by users? May there be a risk of version conflicts?
> > > >> >
> > > >> > Best,
> > > >> > Wei
> > > >> >
> > > >> >
> > > >> > > 2021年12月29日 上午11:56,Xingbo Huang <hx...@gmail.com> 写道:
> > > >> > >
> > > >> > > Hi everyone,
> > > >> > >
> > > >> > > I would like to start a discussion thread on "Support PyFlink
> > Runtime
> > > >> > > Execution in Thread Mode"
> > > >> > >
> > > >> > > We have provided PyFlink Runtime framework to support Python
> > > >> user-defined
> > > >> > > functions since Flink 1.10. The PyFlink Runtime framework is
> > called
> > > >> > Process
> > > >> > > Mode, which depends on an inter-process communication
> architecture
> > > >> based
> > > >> > on
> > > >> > > the Apache Beam Portability framework. Although starting a
> > dedicated
> > > >> > > process to execute Python user-defined functions could have
> better
> > > >> > resource
> > > >> > > isolation, it will bring greater resource and performance
> > overhead.
> > > >> > >
> > > >> > > In order to overcome the resource and performance problems on
> > Process
> > > >> > Mode,
> > > >> > > we will propose a new execution mode which executes Python
> > > >> user-defined
> > > >> > > functions in the same thread instead of a separate process.
> > > >> > >
> > > >> > > I have drafted the FLIP-206[1]. Please feel free to reply to
> this
> > > >> email
> > > >> > > thread. Looking forward to your feedback!
> > > >> > >
> > > >> > > Best,
> > > >> > > Xingbo
> > > >> > >
> > > >> > > [1]
> > > >> > >
> > > >> >
> > > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-206%3A+Support+PyFlink+Runtime+Execution+in+Thread+Mode
> > > >> >
> > > >> >
> > > >>
> > > >
> >
>

Re: [DISCUSS] FLIP-206: Support PyFlink Runtime Execution in Thread Mode

Posted by Xingbo Huang <hx...@gmail.com>.
Hi Till and Thomas,

Thanks a lot for joining the discussion.

For Till:

>>> Is the slower performance currently the biggest pain point for our
Python users? What else are our Python users mainly complaining about?

PyFlink users are most concerned about two parts, one is better usability,
the other is performance. Users often make some benchmarks when they
investigate pyflink[1][2] at the beginning to decide whether to use
PyFlink. The performance of a PyFlink job depends on two parts, one is the
overhead of the PyFlink framework, and the other is the Python function
complexity implemented by the user. In the Python ecosystem, there are many
libraries and tools that can help Python users improve the performance of
their custom functions, such as pandas[3], numba[4] and cython[5]. So we
hope that the framework overhead of PyFlink itself can also be reduced.

>>> Concerning the proposed changes, are there any changes required on the
runtime side (changes to Flink)? How will the deployment and memory
management be affected when using the thread execution mode?

The changes on PyFlink Runtime mentioned here are actually only
modifications of PyFlink custom Operators, such as
PythonScalarFunctionOperator[6], which won't affect deployment and memory
management.

>>> One more question that came to my mind: How much performance
improvement dowe gain on a real-world Python use case? Were the
measurements more like micro benchmarks where the Python UDF was called w/o
the overhead of Flink? I would just be curious how much the Python
component contributes to the overall runtime of a real world job. Do we
have some data on this?

The last figure I put in FLIP is the performance comparison of three real
Flink Stream Sql Jobs. They are a Java UDF job, a Python UDF job in Process
Mode, and a Python UDF job in Thread Mode. The calculated value of QPS is
the end-to-end Flink job execution result. As shown in the performance
comparison chart, the performance of Python udf with the same function can
often only reach 20% of Java udf, so the performance of python udf will
often become the performance bottleneck in a PyFlink job.

For Thomas:

The first time that I realized the framework overhead of various IPC
(socket, grpc, shared memory) cannot be ignored in some scenarios is due to
an image algorithm prediction job of PyFlink. Its input parameters are a
series of huge image binary arrays, and its data size is bigger than 1G.
The performance overhead of serialization/deserialization has become an
important part of its poor performance. Although this job is a bit extreme,
through measurement, we did find the impact of the
serialization/deserialization overhead caused by larger size parameters on
the performance of the IPC framework.

>>> As I understand it, you measured the difference in throughput for UPPER
between process and embedded mode and the difference is 50% increased
throughput?

This 50% is the result when the data size is less than 100byte. When the
data size reaches 1k, the performance of the Embedded Mode will reach about
3.5 times the performance of the Process Mode shown in the FLIP. When the
data reaches 1M, the performance of Embedded Mode can reach 5 times the
performance of the Process Mode. The biggest difference here is that in
Embedded Mode, input/result data does not need to be
serialized/deserialized.

>>> Is that a typical UDF in your usage?

The reason for choosing UPPER is that a simpler udf implementation can make
it easier to evaluate the performance of different execution modes.

>>> What do you observe when the function becomes more complex?

We can analyze the QPS of the framework (process mode or embedded mode) and
the QPS of the UDF calculation logic separately. A more complex UDF means
that it is a UDF with a smaller QPS. The main factors that affect the
framework QPS are data type of parameters, number of parameters and size of
parameters, which will greatly affect the serialization/deserialization
overhead in Process Mode.

The purpose of introducing thread mode is not to replace Process mode, but
to supplement Python udf usage scenarios such as cep and join, and some
scenarios where higher performance is pursued. Compared with Thread mode,
Process Mode has better isolation, which can solve the limitation of thread
mode in some scenarios such as session mode.

[1] https://www.mail-archive.com/user@flink.apache.org/msg42760.html
[2] https://www.mail-archive.com/user@flink.apache.org/msg44975.html
[3] https://pandas.pydata.org/
[4] https://cython.org/
[5] https://numba.pydata.org/
[6]
https://github.com/apache/flink/blob/master/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperator.java

Best,
Xingbo

Thomas Weise <th...@apache.org> 于2022年1月4日周二 04:23写道:

> Interesting discussion. It caught my attention because I was also
> interested in the Beam fn execution overhead a few years ago.
>
> We found back then that while in theory the fn protocol overhead is
> very significant, for realistic function workloads that overhead was
> negligible. And of course it all depends on the use case. It might be
> worthwhile to quantify a couple more scenarios.
>
> As I understand it, you measured the difference in throughput for
> UPPER between process and embedded mode and the difference is 50%
> increased throughput? Is that a typical UDF in your usage? What do you
> observe when the function becomes more complex?
>
> Thanks,
> Thomas
>
> On Mon, Jan 3, 2022 at 5:52 AM Till Rohrmann <tr...@apache.org> wrote:
> >
> > One more question that came to my mind: How much performance improvement
> do
> > we gain on a real-world Python use case? Were the measurements more like
> > micro benchmarks where the Python UDF was called w/o the overhead of
> Flink?
> > I would just be curious how much the Python component contributes to the
> > overall runtime of a real world job. Do we have some data on this?
> >
> > Cheers,
> > Till
> >
> > On Mon, Jan 3, 2022 at 11:45 AM Till Rohrmann <tr...@apache.org>
> wrote:
> >
> > > Hi Xingbo,
> > >
> > > Thanks for creating this FLIP. I have two general questions about the
> > > motivation for this FLIP because I have only very little exposure to
> our
> > > Python users:
> > >
> > > Is the slower performance currently the biggest pain point for our
> Python
> > > users?
> > >
> > > What else are our Python users mainly complaining about?
> > >
> > > Concerning the proposed changes, are there any changes required on the
> > > runtime side (changes to Flink)? How will the deployment and memory
> > > management be affected when using the thread execution mode?
> > >
> > > Cheers,
> > > Till
> > >
> > > On Fri, Dec 31, 2021 at 9:46 AM Xingbo Huang <hx...@gmail.com>
> wrote:
> > >
> > >> Hi Wei,
> > >>
> > >> Thanks a lot for your feedback. Very good questions!
> > >>
> > >> >>> 1. It seems that we dynamically load an embedded Python and user
> > >> dependencies in the TM process. Can they be uninstalled cleanly after
> the
> > >> task finished? i.e. Can we use the Thread Mode in session mode and
> Pyflink
> > >> shell?
> > >>
> > >> I mentioned the limitation of this part in FLIP. There is no problem
> > >> without changing the python interpreter, but if you need to change the
> > >> python interpreter, there is really no way to reload the Python
> library.
> > >> The problem is mainly caused by many Python libraries having an
> assumption
> > >> that they own the process alone.
> > >>
> > >> >>> 2. Does one TM have only one embedded Python running at the same
> time?
> > >> If all the Python operator in the TM share the same PVM, will there
> be a
> > >> loss in performance?
> > >>
> > >> Your understanding is correct that one TM have only one embedded
> Python
> > >> running at the same time. I guess you are worried about the
> performance
> > >> loss of multi threads caused by Python GIL. There is a one-to-one
> > >> correspondence between Java worker thread and Python subinterpreters.
> > >> Although the subinterpreters has not yet completely overcome the GIL
> > >> sharing problem(The Python community’s recent plan for a
> per-interpreter
> > >> GIL is also under discussion[1]), the performance of subinterpreters
> is
> > >> very close to that of multiprocessing [2].
> > >>
> > >> >>> 3. How do we load the relevant c library if the python.executable
> is
> > >> provided by users?
> > >>
> > >> Once python.executable is provided, PEMJA will dynamically load the
> > >> CPython
> > >> library (libpython.*so or libpython.*dylib) and pemja.so installed in
> the
> > >> python environment.
> > >>
> > >> >>> May there be a risk of version conflicts?
> > >>
> > >> I understand that this question is actually discussing whether C/C++
> has a
> > >> way to solve the problem of relying on different versions of a
> library.
> > >> First of all, we know that if there is only static linking, there
> will be
> > >> no such problem.  And I have studied the source code of CPython[3],
> and
> > >> there is no usage of dynamic linking. The rest is the case where
> dynamic
> > >> linking is used in the C library written by the users. There are many
> ways
> > >> to solve this problem with dynamic linking, but after all, this
> library is
> > >> written by users, and it is difficult for us to guarantee that there
> will
> > >> be no conflicts. At this time, Process Mode will be the choice of falk
> > >> back.
> > >>
> > >> [1]
> > >>
> > >>
> https://mail.python.org/archives/list/python-dev@python.org/thread/S5GZZCEREZLA2PEMTVFBCDM52H4JSENR/#RIK75U3ROEHWZL4VENQSQECB4F4GDELV
> > >> [2]
> > >>
> > >>
> https://mail.python.org/archives/list/python-dev@python.org/thread/PNLBJBNIQDMG2YYGPBCTGOKOAVXRBJWY/#L5OXHXPFONRKLR3W6U46LUSUIBN4FCZQ
> > >> [3] https://github.com/python/cpython
> > >>
> > >> Best,
> > >> Xingbo
> > >>
> > >> Wei Zhong <we...@gmail.com> 于2021年12月31日周五 11:49写道:
> > >>
> > >> > Hi Xingbo,
> > >> >
> > >> > Thanks for creating this FLIP. Big +1 for it!
> > >> >
> > >> > I have some question about the Thread Mode:
> > >> >
> > >> > 1. It seems that we dynamically load an embedded Python and user
> > >> > dependencies in the TM process. Can they be uninstalled cleanly
> after
> > >> the
> > >> > task finished? i.e. Can we use the Thread Mode in session mode and
> > >> Pyflink
> > >> > shell?
> > >> >
> > >> > 2. Does one TM have only one embedded Python running at the same
> time?
> > >> If
> > >> > all the Python operator in the TM share the same PVM, will there be
> a
> > >> loss
> > >> > in performance?
> > >> >
> > >> > 3. How do we load the relevant c library if the python.executable is
> > >> > provided by users? May there be a risk of version conflicts?
> > >> >
> > >> > Best,
> > >> > Wei
> > >> >
> > >> >
> > >> > > 2021年12月29日 上午11:56,Xingbo Huang <hx...@gmail.com> 写道:
> > >> > >
> > >> > > Hi everyone,
> > >> > >
> > >> > > I would like to start a discussion thread on "Support PyFlink
> Runtime
> > >> > > Execution in Thread Mode"
> > >> > >
> > >> > > We have provided PyFlink Runtime framework to support Python
> > >> user-defined
> > >> > > functions since Flink 1.10. The PyFlink Runtime framework is
> called
> > >> > Process
> > >> > > Mode, which depends on an inter-process communication architecture
> > >> based
> > >> > on
> > >> > > the Apache Beam Portability framework. Although starting a
> dedicated
> > >> > > process to execute Python user-defined functions could have better
> > >> > resource
> > >> > > isolation, it will bring greater resource and performance
> overhead.
> > >> > >
> > >> > > In order to overcome the resource and performance problems on
> Process
> > >> > Mode,
> > >> > > we will propose a new execution mode which executes Python
> > >> user-defined
> > >> > > functions in the same thread instead of a separate process.
> > >> > >
> > >> > > I have drafted the FLIP-206[1]. Please feel free to reply to this
> > >> email
> > >> > > thread. Looking forward to your feedback!
> > >> > >
> > >> > > Best,
> > >> > > Xingbo
> > >> > >
> > >> > > [1]
> > >> > >
> > >> >
> > >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-206%3A+Support+PyFlink+Runtime+Execution+in+Thread+Mode
> > >> >
> > >> >
> > >>
> > >
>

Re: [DISCUSS] FLIP-206: Support PyFlink Runtime Execution in Thread Mode

Posted by Thomas Weise <th...@apache.org>.
Interesting discussion. It caught my attention because I was also
interested in the Beam fn execution overhead a few years ago.

We found back then that while in theory the fn protocol overhead is
very significant, for realistic function workloads that overhead was
negligible. And of course it all depends on the use case. It might be
worthwhile to quantify a couple more scenarios.

As I understand it, you measured the difference in throughput for
UPPER between process and embedded mode and the difference is 50%
increased throughput? Is that a typical UDF in your usage? What do you
observe when the function becomes more complex?

Thanks,
Thomas

On Mon, Jan 3, 2022 at 5:52 AM Till Rohrmann <tr...@apache.org> wrote:
>
> One more question that came to my mind: How much performance improvement do
> we gain on a real-world Python use case? Were the measurements more like
> micro benchmarks where the Python UDF was called w/o the overhead of Flink?
> I would just be curious how much the Python component contributes to the
> overall runtime of a real world job. Do we have some data on this?
>
> Cheers,
> Till
>
> On Mon, Jan 3, 2022 at 11:45 AM Till Rohrmann <tr...@apache.org> wrote:
>
> > Hi Xingbo,
> >
> > Thanks for creating this FLIP. I have two general questions about the
> > motivation for this FLIP because I have only very little exposure to our
> > Python users:
> >
> > Is the slower performance currently the biggest pain point for our Python
> > users?
> >
> > What else are our Python users mainly complaining about?
> >
> > Concerning the proposed changes, are there any changes required on the
> > runtime side (changes to Flink)? How will the deployment and memory
> > management be affected when using the thread execution mode?
> >
> > Cheers,
> > Till
> >
> > On Fri, Dec 31, 2021 at 9:46 AM Xingbo Huang <hx...@gmail.com> wrote:
> >
> >> Hi Wei,
> >>
> >> Thanks a lot for your feedback. Very good questions!
> >>
> >> >>> 1. It seems that we dynamically load an embedded Python and user
> >> dependencies in the TM process. Can they be uninstalled cleanly after the
> >> task finished? i.e. Can we use the Thread Mode in session mode and Pyflink
> >> shell?
> >>
> >> I mentioned the limitation of this part in FLIP. There is no problem
> >> without changing the python interpreter, but if you need to change the
> >> python interpreter, there is really no way to reload the Python library.
> >> The problem is mainly caused by many Python libraries having an assumption
> >> that they own the process alone.
> >>
> >> >>> 2. Does one TM have only one embedded Python running at the same time?
> >> If all the Python operator in the TM share the same PVM, will there be a
> >> loss in performance?
> >>
> >> Your understanding is correct that one TM have only one embedded Python
> >> running at the same time. I guess you are worried about the performance
> >> loss of multi threads caused by Python GIL. There is a one-to-one
> >> correspondence between Java worker thread and Python subinterpreters.
> >> Although the subinterpreters has not yet completely overcome the GIL
> >> sharing problem(The Python community’s recent plan for a per-interpreter
> >> GIL is also under discussion[1]), the performance of subinterpreters is
> >> very close to that of multiprocessing [2].
> >>
> >> >>> 3. How do we load the relevant c library if the python.executable is
> >> provided by users?
> >>
> >> Once python.executable is provided, PEMJA will dynamically load the
> >> CPython
> >> library (libpython.*so or libpython.*dylib) and pemja.so installed in the
> >> python environment.
> >>
> >> >>> May there be a risk of version conflicts?
> >>
> >> I understand that this question is actually discussing whether C/C++ has a
> >> way to solve the problem of relying on different versions of a library.
> >> First of all, we know that if there is only static linking, there will be
> >> no such problem.  And I have studied the source code of CPython[3], and
> >> there is no usage of dynamic linking. The rest is the case where dynamic
> >> linking is used in the C library written by the users. There are many ways
> >> to solve this problem with dynamic linking, but after all, this library is
> >> written by users, and it is difficult for us to guarantee that there will
> >> be no conflicts. At this time, Process Mode will be the choice of falk
> >> back.
> >>
> >> [1]
> >>
> >> https://mail.python.org/archives/list/python-dev@python.org/thread/S5GZZCEREZLA2PEMTVFBCDM52H4JSENR/#RIK75U3ROEHWZL4VENQSQECB4F4GDELV
> >> [2]
> >>
> >> https://mail.python.org/archives/list/python-dev@python.org/thread/PNLBJBNIQDMG2YYGPBCTGOKOAVXRBJWY/#L5OXHXPFONRKLR3W6U46LUSUIBN4FCZQ
> >> [3] https://github.com/python/cpython
> >>
> >> Best,
> >> Xingbo
> >>
> >> Wei Zhong <we...@gmail.com> 于2021年12月31日周五 11:49写道:
> >>
> >> > Hi Xingbo,
> >> >
> >> > Thanks for creating this FLIP. Big +1 for it!
> >> >
> >> > I have some question about the Thread Mode:
> >> >
> >> > 1. It seems that we dynamically load an embedded Python and user
> >> > dependencies in the TM process. Can they be uninstalled cleanly after
> >> the
> >> > task finished? i.e. Can we use the Thread Mode in session mode and
> >> Pyflink
> >> > shell?
> >> >
> >> > 2. Does one TM have only one embedded Python running at the same time?
> >> If
> >> > all the Python operator in the TM share the same PVM, will there be a
> >> loss
> >> > in performance?
> >> >
> >> > 3. How do we load the relevant c library if the python.executable is
> >> > provided by users? May there be a risk of version conflicts?
> >> >
> >> > Best,
> >> > Wei
> >> >
> >> >
> >> > > 2021年12月29日 上午11:56,Xingbo Huang <hx...@gmail.com> 写道:
> >> > >
> >> > > Hi everyone,
> >> > >
> >> > > I would like to start a discussion thread on "Support PyFlink Runtime
> >> > > Execution in Thread Mode"
> >> > >
> >> > > We have provided PyFlink Runtime framework to support Python
> >> user-defined
> >> > > functions since Flink 1.10. The PyFlink Runtime framework is called
> >> > Process
> >> > > Mode, which depends on an inter-process communication architecture
> >> based
> >> > on
> >> > > the Apache Beam Portability framework. Although starting a dedicated
> >> > > process to execute Python user-defined functions could have better
> >> > resource
> >> > > isolation, it will bring greater resource and performance overhead.
> >> > >
> >> > > In order to overcome the resource and performance problems on Process
> >> > Mode,
> >> > > we will propose a new execution mode which executes Python
> >> user-defined
> >> > > functions in the same thread instead of a separate process.
> >> > >
> >> > > I have drafted the FLIP-206[1]. Please feel free to reply to this
> >> email
> >> > > thread. Looking forward to your feedback!
> >> > >
> >> > > Best,
> >> > > Xingbo
> >> > >
> >> > > [1]
> >> > >
> >> >
> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-206%3A+Support+PyFlink+Runtime+Execution+in+Thread+Mode
> >> >
> >> >
> >>
> >

Re: [DISCUSS] FLIP-206: Support PyFlink Runtime Execution in Thread Mode

Posted by Till Rohrmann <tr...@apache.org>.
One more question that came to my mind: How much performance improvement do
we gain on a real-world Python use case? Were the measurements more like
micro benchmarks where the Python UDF was called w/o the overhead of Flink?
I would just be curious how much the Python component contributes to the
overall runtime of a real world job. Do we have some data on this?

Cheers,
Till

On Mon, Jan 3, 2022 at 11:45 AM Till Rohrmann <tr...@apache.org> wrote:

> Hi Xingbo,
>
> Thanks for creating this FLIP. I have two general questions about the
> motivation for this FLIP because I have only very little exposure to our
> Python users:
>
> Is the slower performance currently the biggest pain point for our Python
> users?
>
> What else are our Python users mainly complaining about?
>
> Concerning the proposed changes, are there any changes required on the
> runtime side (changes to Flink)? How will the deployment and memory
> management be affected when using the thread execution mode?
>
> Cheers,
> Till
>
> On Fri, Dec 31, 2021 at 9:46 AM Xingbo Huang <hx...@gmail.com> wrote:
>
>> Hi Wei,
>>
>> Thanks a lot for your feedback. Very good questions!
>>
>> >>> 1. It seems that we dynamically load an embedded Python and user
>> dependencies in the TM process. Can they be uninstalled cleanly after the
>> task finished? i.e. Can we use the Thread Mode in session mode and Pyflink
>> shell?
>>
>> I mentioned the limitation of this part in FLIP. There is no problem
>> without changing the python interpreter, but if you need to change the
>> python interpreter, there is really no way to reload the Python library.
>> The problem is mainly caused by many Python libraries having an assumption
>> that they own the process alone.
>>
>> >>> 2. Does one TM have only one embedded Python running at the same time?
>> If all the Python operator in the TM share the same PVM, will there be a
>> loss in performance?
>>
>> Your understanding is correct that one TM have only one embedded Python
>> running at the same time. I guess you are worried about the performance
>> loss of multi threads caused by Python GIL. There is a one-to-one
>> correspondence between Java worker thread and Python subinterpreters.
>> Although the subinterpreters has not yet completely overcome the GIL
>> sharing problem(The Python community’s recent plan for a per-interpreter
>> GIL is also under discussion[1]), the performance of subinterpreters is
>> very close to that of multiprocessing [2].
>>
>> >>> 3. How do we load the relevant c library if the python.executable is
>> provided by users?
>>
>> Once python.executable is provided, PEMJA will dynamically load the
>> CPython
>> library (libpython.*so or libpython.*dylib) and pemja.so installed in the
>> python environment.
>>
>> >>> May there be a risk of version conflicts?
>>
>> I understand that this question is actually discussing whether C/C++ has a
>> way to solve the problem of relying on different versions of a library.
>> First of all, we know that if there is only static linking, there will be
>> no such problem.  And I have studied the source code of CPython[3], and
>> there is no usage of dynamic linking. The rest is the case where dynamic
>> linking is used in the C library written by the users. There are many ways
>> to solve this problem with dynamic linking, but after all, this library is
>> written by users, and it is difficult for us to guarantee that there will
>> be no conflicts. At this time, Process Mode will be the choice of falk
>> back.
>>
>> [1]
>>
>> https://mail.python.org/archives/list/python-dev@python.org/thread/S5GZZCEREZLA2PEMTVFBCDM52H4JSENR/#RIK75U3ROEHWZL4VENQSQECB4F4GDELV
>> [2]
>>
>> https://mail.python.org/archives/list/python-dev@python.org/thread/PNLBJBNIQDMG2YYGPBCTGOKOAVXRBJWY/#L5OXHXPFONRKLR3W6U46LUSUIBN4FCZQ
>> [3] https://github.com/python/cpython
>>
>> Best,
>> Xingbo
>>
>> Wei Zhong <we...@gmail.com> 于2021年12月31日周五 11:49写道:
>>
>> > Hi Xingbo,
>> >
>> > Thanks for creating this FLIP. Big +1 for it!
>> >
>> > I have some question about the Thread Mode:
>> >
>> > 1. It seems that we dynamically load an embedded Python and user
>> > dependencies in the TM process. Can they be uninstalled cleanly after
>> the
>> > task finished? i.e. Can we use the Thread Mode in session mode and
>> Pyflink
>> > shell?
>> >
>> > 2. Does one TM have only one embedded Python running at the same time?
>> If
>> > all the Python operator in the TM share the same PVM, will there be a
>> loss
>> > in performance?
>> >
>> > 3. How do we load the relevant c library if the python.executable is
>> > provided by users? May there be a risk of version conflicts?
>> >
>> > Best,
>> > Wei
>> >
>> >
>> > > 2021年12月29日 上午11:56,Xingbo Huang <hx...@gmail.com> 写道:
>> > >
>> > > Hi everyone,
>> > >
>> > > I would like to start a discussion thread on "Support PyFlink Runtime
>> > > Execution in Thread Mode"
>> > >
>> > > We have provided PyFlink Runtime framework to support Python
>> user-defined
>> > > functions since Flink 1.10. The PyFlink Runtime framework is called
>> > Process
>> > > Mode, which depends on an inter-process communication architecture
>> based
>> > on
>> > > the Apache Beam Portability framework. Although starting a dedicated
>> > > process to execute Python user-defined functions could have better
>> > resource
>> > > isolation, it will bring greater resource and performance overhead.
>> > >
>> > > In order to overcome the resource and performance problems on Process
>> > Mode,
>> > > we will propose a new execution mode which executes Python
>> user-defined
>> > > functions in the same thread instead of a separate process.
>> > >
>> > > I have drafted the FLIP-206[1]. Please feel free to reply to this
>> email
>> > > thread. Looking forward to your feedback!
>> > >
>> > > Best,
>> > > Xingbo
>> > >
>> > > [1]
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-206%3A+Support+PyFlink+Runtime+Execution+in+Thread+Mode
>> >
>> >
>>
>

Re: [DISCUSS] FLIP-206: Support PyFlink Runtime Execution in Thread Mode

Posted by Till Rohrmann <tr...@apache.org>.
Hi Xingbo,

Thanks for creating this FLIP. I have two general questions about the
motivation for this FLIP because I have only very little exposure to our
Python users:

Is the slower performance currently the biggest pain point for our Python
users?

What else are our Python users mainly complaining about?

Concerning the proposed changes, are there any changes required on the
runtime side (changes to Flink)? How will the deployment and memory
management be affected when using the thread execution mode?

Cheers,
Till

On Fri, Dec 31, 2021 at 9:46 AM Xingbo Huang <hx...@gmail.com> wrote:

> Hi Wei,
>
> Thanks a lot for your feedback. Very good questions!
>
> >>> 1. It seems that we dynamically load an embedded Python and user
> dependencies in the TM process. Can they be uninstalled cleanly after the
> task finished? i.e. Can we use the Thread Mode in session mode and Pyflink
> shell?
>
> I mentioned the limitation of this part in FLIP. There is no problem
> without changing the python interpreter, but if you need to change the
> python interpreter, there is really no way to reload the Python library.
> The problem is mainly caused by many Python libraries having an assumption
> that they own the process alone.
>
> >>> 2. Does one TM have only one embedded Python running at the same time?
> If all the Python operator in the TM share the same PVM, will there be a
> loss in performance?
>
> Your understanding is correct that one TM have only one embedded Python
> running at the same time. I guess you are worried about the performance
> loss of multi threads caused by Python GIL. There is a one-to-one
> correspondence between Java worker thread and Python subinterpreters.
> Although the subinterpreters has not yet completely overcome the GIL
> sharing problem(The Python community’s recent plan for a per-interpreter
> GIL is also under discussion[1]), the performance of subinterpreters is
> very close to that of multiprocessing [2].
>
> >>> 3. How do we load the relevant c library if the python.executable is
> provided by users?
>
> Once python.executable is provided, PEMJA will dynamically load the CPython
> library (libpython.*so or libpython.*dylib) and pemja.so installed in the
> python environment.
>
> >>> May there be a risk of version conflicts?
>
> I understand that this question is actually discussing whether C/C++ has a
> way to solve the problem of relying on different versions of a library.
> First of all, we know that if there is only static linking, there will be
> no such problem.  And I have studied the source code of CPython[3], and
> there is no usage of dynamic linking. The rest is the case where dynamic
> linking is used in the C library written by the users. There are many ways
> to solve this problem with dynamic linking, but after all, this library is
> written by users, and it is difficult for us to guarantee that there will
> be no conflicts. At this time, Process Mode will be the choice of falk
> back.
>
> [1]
>
> https://mail.python.org/archives/list/python-dev@python.org/thread/S5GZZCEREZLA2PEMTVFBCDM52H4JSENR/#RIK75U3ROEHWZL4VENQSQECB4F4GDELV
> [2]
>
> https://mail.python.org/archives/list/python-dev@python.org/thread/PNLBJBNIQDMG2YYGPBCTGOKOAVXRBJWY/#L5OXHXPFONRKLR3W6U46LUSUIBN4FCZQ
> [3] https://github.com/python/cpython
>
> Best,
> Xingbo
>
> Wei Zhong <we...@gmail.com> 于2021年12月31日周五 11:49写道:
>
> > Hi Xingbo,
> >
> > Thanks for creating this FLIP. Big +1 for it!
> >
> > I have some question about the Thread Mode:
> >
> > 1. It seems that we dynamically load an embedded Python and user
> > dependencies in the TM process. Can they be uninstalled cleanly after the
> > task finished? i.e. Can we use the Thread Mode in session mode and
> Pyflink
> > shell?
> >
> > 2. Does one TM have only one embedded Python running at the same time? If
> > all the Python operator in the TM share the same PVM, will there be a
> loss
> > in performance?
> >
> > 3. How do we load the relevant c library if the python.executable is
> > provided by users? May there be a risk of version conflicts?
> >
> > Best,
> > Wei
> >
> >
> > > 2021年12月29日 上午11:56,Xingbo Huang <hx...@gmail.com> 写道:
> > >
> > > Hi everyone,
> > >
> > > I would like to start a discussion thread on "Support PyFlink Runtime
> > > Execution in Thread Mode"
> > >
> > > We have provided PyFlink Runtime framework to support Python
> user-defined
> > > functions since Flink 1.10. The PyFlink Runtime framework is called
> > Process
> > > Mode, which depends on an inter-process communication architecture
> based
> > on
> > > the Apache Beam Portability framework. Although starting a dedicated
> > > process to execute Python user-defined functions could have better
> > resource
> > > isolation, it will bring greater resource and performance overhead.
> > >
> > > In order to overcome the resource and performance problems on Process
> > Mode,
> > > we will propose a new execution mode which executes Python user-defined
> > > functions in the same thread instead of a separate process.
> > >
> > > I have drafted the FLIP-206[1]. Please feel free to reply to this email
> > > thread. Looking forward to your feedback!
> > >
> > > Best,
> > > Xingbo
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-206%3A+Support+PyFlink+Runtime+Execution+in+Thread+Mode
> >
> >
>

Re: [DISCUSS] FLIP-206: Support PyFlink Runtime Execution in Thread Mode

Posted by Xingbo Huang <hx...@gmail.com>.
Hi Wei,

Thanks a lot for your feedback. Very good questions!

>>> 1. It seems that we dynamically load an embedded Python and user
dependencies in the TM process. Can they be uninstalled cleanly after the
task finished? i.e. Can we use the Thread Mode in session mode and Pyflink
shell?

I mentioned the limitation of this part in FLIP. There is no problem
without changing the python interpreter, but if you need to change the
python interpreter, there is really no way to reload the Python library.
The problem is mainly caused by many Python libraries having an assumption
that they own the process alone.

>>> 2. Does one TM have only one embedded Python running at the same time?
If all the Python operator in the TM share the same PVM, will there be a
loss in performance?

Your understanding is correct that one TM have only one embedded Python
running at the same time. I guess you are worried about the performance
loss of multi threads caused by Python GIL. There is a one-to-one
correspondence between Java worker thread and Python subinterpreters.
Although the subinterpreters has not yet completely overcome the GIL
sharing problem(The Python community’s recent plan for a per-interpreter
GIL is also under discussion[1]), the performance of subinterpreters is
very close to that of multiprocessing [2].

>>> 3. How do we load the relevant c library if the python.executable is
provided by users?

Once python.executable is provided, PEMJA will dynamically load the CPython
library (libpython.*so or libpython.*dylib) and pemja.so installed in the
python environment.

>>> May there be a risk of version conflicts?

I understand that this question is actually discussing whether C/C++ has a
way to solve the problem of relying on different versions of a library.
First of all, we know that if there is only static linking, there will be
no such problem.  And I have studied the source code of CPython[3], and
there is no usage of dynamic linking. The rest is the case where dynamic
linking is used in the C library written by the users. There are many ways
to solve this problem with dynamic linking, but after all, this library is
written by users, and it is difficult for us to guarantee that there will
be no conflicts. At this time, Process Mode will be the choice of falk back.

[1]
https://mail.python.org/archives/list/python-dev@python.org/thread/S5GZZCEREZLA2PEMTVFBCDM52H4JSENR/#RIK75U3ROEHWZL4VENQSQECB4F4GDELV
[2]
https://mail.python.org/archives/list/python-dev@python.org/thread/PNLBJBNIQDMG2YYGPBCTGOKOAVXRBJWY/#L5OXHXPFONRKLR3W6U46LUSUIBN4FCZQ
[3] https://github.com/python/cpython

Best,
Xingbo

Wei Zhong <we...@gmail.com> 于2021年12月31日周五 11:49写道:

> Hi Xingbo,
>
> Thanks for creating this FLIP. Big +1 for it!
>
> I have some question about the Thread Mode:
>
> 1. It seems that we dynamically load an embedded Python and user
> dependencies in the TM process. Can they be uninstalled cleanly after the
> task finished? i.e. Can we use the Thread Mode in session mode and Pyflink
> shell?
>
> 2. Does one TM have only one embedded Python running at the same time? If
> all the Python operator in the TM share the same PVM, will there be a loss
> in performance?
>
> 3. How do we load the relevant c library if the python.executable is
> provided by users? May there be a risk of version conflicts?
>
> Best,
> Wei
>
>
> > 2021年12月29日 上午11:56,Xingbo Huang <hx...@gmail.com> 写道:
> >
> > Hi everyone,
> >
> > I would like to start a discussion thread on "Support PyFlink Runtime
> > Execution in Thread Mode"
> >
> > We have provided PyFlink Runtime framework to support Python user-defined
> > functions since Flink 1.10. The PyFlink Runtime framework is called
> Process
> > Mode, which depends on an inter-process communication architecture based
> on
> > the Apache Beam Portability framework. Although starting a dedicated
> > process to execute Python user-defined functions could have better
> resource
> > isolation, it will bring greater resource and performance overhead.
> >
> > In order to overcome the resource and performance problems on Process
> Mode,
> > we will propose a new execution mode which executes Python user-defined
> > functions in the same thread instead of a separate process.
> >
> > I have drafted the FLIP-206[1]. Please feel free to reply to this email
> > thread. Looking forward to your feedback!
> >
> > Best,
> > Xingbo
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-206%3A+Support+PyFlink+Runtime+Execution+in+Thread+Mode
>
>

Re: [DISCUSS] FLIP-206: Support PyFlink Runtime Execution in Thread Mode

Posted by Wei Zhong <we...@gmail.com>.
Hi Xingbo,

Thanks for creating this FLIP. Big +1 for it!

I have some question about the Thread Mode:

1. It seems that we dynamically load an embedded Python and user dependencies in the TM process. Can they be uninstalled cleanly after the task finished? i.e. Can we use the Thread Mode in session mode and Pyflink shell?

2. Does one TM have only one embedded Python running at the same time? If all the Python operator in the TM share the same PVM, will there be a loss in performance?

3. How do we load the relevant c library if the python.executable is provided by users? May there be a risk of version conflicts?

Best,
Wei


> 2021年12月29日 上午11:56,Xingbo Huang <hx...@gmail.com> 写道:
> 
> Hi everyone,
> 
> I would like to start a discussion thread on "Support PyFlink Runtime
> Execution in Thread Mode"
> 
> We have provided PyFlink Runtime framework to support Python user-defined
> functions since Flink 1.10. The PyFlink Runtime framework is called Process
> Mode, which depends on an inter-process communication architecture based on
> the Apache Beam Portability framework. Although starting a dedicated
> process to execute Python user-defined functions could have better resource
> isolation, it will bring greater resource and performance overhead.
> 
> In order to overcome the resource and performance problems on Process Mode,
> we will propose a new execution mode which executes Python user-defined
> functions in the same thread instead of a separate process.
> 
> I have drafted the FLIP-206[1]. Please feel free to reply to this email
> thread. Looking forward to your feedback!
> 
> Best,
> Xingbo
> 
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-206%3A+Support+PyFlink+Runtime+Execution+in+Thread+Mode