You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/27 07:40:10 UTC

[GitHub] [flink-web] dianfu commented on a diff in pull request #530: Add blog post "Exploring the thread mode in PyFlink"

dianfu commented on code in PR #530:
URL: https://github.com/apache/flink-web/pull/530#discussion_r859472037


##########
_posts/2022-04-24-pyflink-1.15-thread-mode.md:
##########
@@ -0,0 +1,463 @@
+---
+layout: post
+title: "Exploring the thread mode in PyFlink"
+date: 2022-04-24T12:00:00.000Z
+authors:
+- Xingbo:
+  name: "Xingbo Huang"
+- Dian:
+  name: "Dian Fu"
+excerpt: Flink 1.15 introduced a new Runtime Execution Mode named 'thread' mode in PyFlink. This post explains how it works and when to use it.
+---
+
+PyFlink was introduced in Flink 1.9 which purpose is to bring the power of Flink to Python users and allow Python users to develop Flink jobs in Python language.
+The functionality becomes more and more mature through the development in the past releases.
+
+Before Flink 1.15, Python user-defined functions will be executed in separate Python processes (based on the [Apache Beam Portability Framework](https://docs.google.com/document/d/1B9NmaBSKCnMJQp-ibkxvZ_U233Su67c1eYgBhrqWP24/edit#heading=h.khjybycus70)).
+It will bring additional serialization/deserialization overhead and also communication overhead. In scenarios where the data size is big, e.g. image processing, etc,
+this overhead becomes non-negligible. Besides, since it involves inter-process communication, the processing latency is also non-negligible,
+which is unacceptable in scenarios where the latency is critical, e.g. quantitative transaction, etc.
+
+In Flink 1.15, we have introduced a new execution mode named 'thread' mode (based on [PEMJA](https://github.com/alibaba/pemja)) where the Python user-defined functions will be executed in the JVM as
+a thread instead of a separate Python process. In this article, we will dig into the details about this execution mode and also share some benchmark data to
+give users a basic understanding of how it works and which scenarios it’s applicable for.
+

Review Comment:
   Add catalogue as following: 
   ```
   {% toc %}
   ``` 



##########
_posts/2022-04-24-pyflink-1.15-thread-mode.md:
##########
@@ -0,0 +1,463 @@
+---
+layout: post
+title: "Exploring the thread mode in PyFlink"
+date: 2022-04-24T12:00:00.000Z
+authors:
+- Xingbo:
+  name: "Xingbo Huang"
+- Dian:
+  name: "Dian Fu"
+excerpt: Flink 1.15 introduced a new Runtime Execution Mode named 'thread' mode in PyFlink. This post explains how it works and when to use it.
+---
+
+PyFlink was introduced in Flink 1.9 which purpose is to bring the power of Flink to Python users and allow Python users to develop Flink jobs in Python language.
+The functionality becomes more and more mature through the development in the past releases.
+
+Before Flink 1.15, Python user-defined functions will be executed in separate Python processes (based on the [Apache Beam Portability Framework](https://docs.google.com/document/d/1B9NmaBSKCnMJQp-ibkxvZ_U233Su67c1eYgBhrqWP24/edit#heading=h.khjybycus70)).
+It will bring additional serialization/deserialization overhead and also communication overhead. In scenarios where the data size is big, e.g. image processing, etc,
+this overhead becomes non-negligible. Besides, since it involves inter-process communication, the processing latency is also non-negligible,
+which is unacceptable in scenarios where the latency is critical, e.g. quantitative transaction, etc.
+
+In Flink 1.15, we have introduced a new execution mode named 'thread' mode (based on [PEMJA](https://github.com/alibaba/pemja)) where the Python user-defined functions will be executed in the JVM as
+a thread instead of a separate Python process. In this article, we will dig into the details about this execution mode and also share some benchmark data to
+give users a basic understanding of how it works and which scenarios it’s applicable for.
+
+## Process Mode
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/2022-04-24-pyflink-1.15-thread-mode/pyflink-architecture-overview.png"/>
+<br/>
+<i><small>Fig. 1 - PyFlink Architecture Overview</small></i>
+</center>
+<br/>
+
+From Fig. 1, we can see the architecture of PyFlink. As shown on the left side of Fig.1, users could use PyFlink API(Python Table API & SQL or Python DataStream API) to declare the logic of jobs,
+which will be finally translated into JobGraph (DAG of the job) which could be recognized by Flink’s execution framework. It should be noted that Python operators (Flink operators whose purpose is to
+execute Python user-defined functions) will be used to execute the Python user-defined functions.
+
+On the right side of Fig. 1, it shows the details of the Python operators where the Python user-defined functions were executed in separate Python processes.
+
+In order to communicate with the Python worker process, a series of communication services are required between the Python operator(runs in JVM) and the Python worker(runs in Python VM).
+PyFlink has employed [Apache Beam Portability framework](https://docs.google.com/document/d/1B9NmaBSKCnMJQp-ibkxvZ_U233Su67c1eYgBhrqWP24/edit#heading=h.khjybycus70) to execute Python user-defined functions which provides the basic building blocks required for PyFlink.
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/2022-04-24-pyflink-1.15-thread-mode/pyflink-process-mode.png"/>
+<br/>
+<i><small>Fig. 2 - PyFlink Runtime in Process Mode</small></i>
+</center>
+<br/>
+
+Process mode can be executed stably and efficiently in most scenarios. It is enough for more users. However, in some scenarios, it doesn’t work well due to the additional serialization/deserialization overhead.
+One of the most typical scenarios is image processing, where the input data size is often very big. Besides, since it involves inter-process communication, the processing latency is also non-negligible
+which is unacceptable in scenarios where latency is critical, e.g. quantitative transaction, etc. In order to overcome these problems, we have introduced a new execution mode(thread mode)
+where Python user-defined functions will be executed in the JVM as a thread instead of a separate Python process. In the following section, we will dig into the details of this new execution mode.
+
+## PEMJA
+
+Before digging into the thread mode, let’s introduce a library [PEMJA](https://github.com/alibaba/pemja) firstly, which is the core to the architecture of thread mode.
+
+As we all know, Java Native Interface (JNI) is a standard programming interface for writing Java native methods and embedding the Java virtual machine into native applications.
+What’s more, CPython provides Python/C API to help embed Python in C Applications.
+
+So if we combine these two interfaces together, we can embed Python in Java Application. Since this library solves a general problem that Python and Java could call each other,
+we have open sourced it as an independent project, and PyFlink has depended on [PEMJA](https://github.com/alibaba/pemja) since Flink 1.15 to support thread mode.
+
+### PEMJA Architecture
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/2022-04-24-pyflink-1.15-thread-mode/pemja.png"/>
+<br/>
+<i><small>Fig. 3 - PEMJA Architecture</small></i>
+</center>
+<br/>
+
+As we can see from the architecture of [PEMJA](https://github.com/alibaba/pemja) in Fig. 3, JVM and PVM can call each other in the same process through [PEMJA](https://github.com/alibaba/pemja) Library.
+
+Firstly, [PEMJA](https://github.com/alibaba/pemja) will start a daemon thread in JVM, which is responsible for initializing the Python Environment and creating a Python Main Interpreter owned by this process.
+The reason why [PEMJA](https://github.com/alibaba/pemja) uses a dedicated thread to initialize Python Environment is to avoid potential deadlocks in Python Interpreter.
+Python Interpreter could deadlock when trying to acquire the GIL through methods such as [PyGILState_*](https://docs.python.org/3/c-api/init.html#c.PyGILState_Ensure) in Python/C API concurrently.
+It should be noted that [PEMJA](https://github.com/alibaba/pemja) doesn’t call those methods directly, however, it may happen that third-party libraries may call them, e.g. [numpy](https://numpy.org/), etc.
+To get around this, we use a dedicated thread to initialize the Python Environment.  
+
+Then, each Java worker thread can invoke the Python functions through the Python [ThreadState](https://docs.python.org/3/c-api/init.html) created from Python Main Interpreter. 
+
+### Comparison with other solutions
+
+<table width="95%" border="1">
+  <thead>
+    <tr>
+      <th style="text-align: center">Framework</th>
+      <th style="text-align: center">Principle</th>
+      <th style="text-align: center">Limitations</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td style="text-align: center"><a href="https://www.jython.org/">Jython</a></td>
+      <td style="text-align: center">Python compiler implemented in Java</td>
+      <td style="text-align: justify">
+        <ul>
+          <li>Only support for Python2</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td style="text-align: center"><a href="https://www.graalvm.org/python/">GraalVM</a></td>
+      <td style="text-align: center">Truffle framework</td>
+      <td style="text-align: justify">
+        <ul>
+          <li>Compatibility issues with various Python ecological libraries</li>
+          <li>Works only with GraalVM</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td style="text-align: center"><a href="https://github.com/jpype-project/jpype">JPype</a></td>
+      <td style="text-align: center">JNI + Python/C API</td>
+      <td style="text-align: justify">
+        <ul>
+          <li>Don’t support Java calling Python</li>
+          <li>Only support for CPython</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td style="text-align: center"><a href="https://github.com/ninia/jep">Jep</a></td>
+      <td style="text-align: center">JNI + Python/C API</td>
+      <td style="text-align: justify">
+        <ul>
+          <li>Difficult to integrate</li>
+          <li>Performance is not good enough</li>
+          <li>Only support for CPython</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td style="text-align: center"><a href="https://github.com/alibaba/pemja">PEMJA</a></td>
+      <td style="text-align: center">JNI + Python/C API</td>
+      <td style="text-align: justify">
+        <ul>
+          <li>Only support for CPython</li>
+        </ul>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+
+In the table above, we have made a basic comparison of the popular solutions of Java/Python calling libraries.
+
+[Jython](https://www.jython.org/): Jython is a Python interpreter implemented in Java language. Because its implementation language is Java,
+the interoperability between code implemented by Python syntax and Java code will be very natural.
+However, Jython does not support Python 3 anymore, and it is no longer actively maintained.
+
+[GraalVM](https://www.graalvm.org/python/): GraalVM takes use of Truffle framework to support interoperability between Python and Java.
+However, it has the limitation that not all the Python libraries are supported. As we know, many Python libraries rely on standard CPython to implement their C extensions.
+The other problem is that it only works with GraalVM, which means high migration costs.
+
+[JPype](https://github.com/jpype-project/jpype): Similar to [PEMJA](https://github.com/alibaba/pemja),
+JPype is also a framework built using JNI and Python/C API, but JPype only supports calling Java from Python.
+
+[Jep](https://github.com/ninia/jep): Similar to [PEMJA](https://github.com/alibaba/pemja), Jep is also a framework built using JNI and Python/C API and it supports calling Python from Java.
+However, it doesn’t provide a jar to the maven repository and the process of loading native packages needs to be specified in advance through jvm parameters or environment variables when the JVM starts,
+which makes it difficult to integrate. Furthermore, our benchmark shows that the performance is not very good.
+
+[PEMJA](https://github.com/alibaba/pemja): Similar to Jep and JPype, PEMJA is built on CPython, so it cannot support other Python interpreters, such as PyPy, etc.
+Since CPython is the most used implementation and standard of Python Runtime officially provided by Python, most libraries of the Python ecology are built on CPython Runtime and so could work with PEMJA naturally.
+
+## Thread Mode
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/2022-04-24-pyflink-1.15-thread-mode/pyflink-thread-mode.png"/>
+<br/>
+<i><small>Fig. 4 - PyFlink Runtime in Thread Mode</small></i>
+</center>
+<br/>
+
+From the picture above, we can see that in thread mode, the Python user-defined function runs in the same process as the Python operator(which runs in JVM).
+[PEMJA](https://github.com/alibaba/pemja) is used as a bridge between the Java code and the Python code. 
+
+Since the Python user-defined function runs in JVM, for each input data received from the upstream operators, it will be passed to
+the Python user-defined function directly instead of buffered and passed to the Python user-defined function in a batch.
+Therefore, thread mode could have lower latency compared to the process mode. Currently, if users want to achieve lower latency in process mode, usually they need to configure the
+`python.fn-execution.bundle.size` or `python.fn-execution.bundle.time` to a lower value. However, since it involves inter-process communication,
+the latency is still a little high in some scenarios. However, this is not a problem any more in thread mode. Besides, configuring `python.fn-execution.bundle.size` or `python.fn-execution.bundle.time` to
+a lower value usually will affect the overall performance of the job and this will also not be a problem in thread mode.
+
+
+## Comparisons between process mode and thread mode
+
+<table width="95%" border="1">
+  <thead>
+    <tr>
+      <th style="text-align: center">Execution Mode</th>
+      <th style="text-align: center">Benefits</th>
+      <th style="text-align: center">Limitations</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td style="text-align: center">Process Mode</td>
+      <td style="text-align: justify">
+        <ul>
+          <li>Better resource isolation</li>
+        </ul>
+      </td>
+      <td style="text-align: justify">
+        <ul>
+          <li>IPC overhead</li>
+          <li>High implementation complexity</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td style="text-align: center">Thread Mode</td>
+      <td style="text-align: justify">
+        <ul>
+          <li>Higher throughput</li>
+          <li>Lower latency</li>
+          <li>Less checkpoint time</li>
+          <li>Less usage restrictions</li>
+        </ul>
+      </td>
+      <td style="text-align: justify">
+        <ul>
+          <li>Only support for CPython</li>
+          <li>Multiple jobs cannot use different Python interpreters in session mode</li>
+          <li>Performance is affected by the GIL</li>
+        </ul>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+`Benefits of thread mode`:

Review Comment:
   ```suggestion
   ### Benefits of thread mode
   ```



##########
_posts/2022-04-24-pyflink-1.15-thread-mode.md:
##########
@@ -0,0 +1,463 @@
+---
+layout: post
+title: "Exploring the thread mode in PyFlink"
+date: 2022-04-24T12:00:00.000Z
+authors:
+- Xingbo:
+  name: "Xingbo Huang"
+- Dian:
+  name: "Dian Fu"
+excerpt: Flink 1.15 introduced a new Runtime Execution Mode named 'thread' mode in PyFlink. This post explains how it works and when to use it.
+---
+
+PyFlink was introduced in Flink 1.9 which purpose is to bring the power of Flink to Python users and allow Python users to develop Flink jobs in Python language.
+The functionality becomes more and more mature through the development in the past releases.
+
+Before Flink 1.15, Python user-defined functions will be executed in separate Python processes (based on the [Apache Beam Portability Framework](https://docs.google.com/document/d/1B9NmaBSKCnMJQp-ibkxvZ_U233Su67c1eYgBhrqWP24/edit#heading=h.khjybycus70)).
+It will bring additional serialization/deserialization overhead and also communication overhead. In scenarios where the data size is big, e.g. image processing, etc,
+this overhead becomes non-negligible. Besides, since it involves inter-process communication, the processing latency is also non-negligible,
+which is unacceptable in scenarios where the latency is critical, e.g. quantitative transaction, etc.
+
+In Flink 1.15, we have introduced a new execution mode named 'thread' mode (based on [PEMJA](https://github.com/alibaba/pemja)) where the Python user-defined functions will be executed in the JVM as
+a thread instead of a separate Python process. In this article, we will dig into the details about this execution mode and also share some benchmark data to
+give users a basic understanding of how it works and which scenarios it’s applicable for.
+
+## Process Mode
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/2022-04-24-pyflink-1.15-thread-mode/pyflink-architecture-overview.png"/>
+<br/>
+<i><small>Fig. 1 - PyFlink Architecture Overview</small></i>
+</center>
+<br/>
+
+From Fig. 1, we can see the architecture of PyFlink. As shown on the left side of Fig.1, users could use PyFlink API(Python Table API & SQL or Python DataStream API) to declare the logic of jobs,
+which will be finally translated into JobGraph (DAG of the job) which could be recognized by Flink’s execution framework. It should be noted that Python operators (Flink operators whose purpose is to
+execute Python user-defined functions) will be used to execute the Python user-defined functions.
+
+On the right side of Fig. 1, it shows the details of the Python operators where the Python user-defined functions were executed in separate Python processes.
+
+In order to communicate with the Python worker process, a series of communication services are required between the Python operator(runs in JVM) and the Python worker(runs in Python VM).
+PyFlink has employed [Apache Beam Portability framework](https://docs.google.com/document/d/1B9NmaBSKCnMJQp-ibkxvZ_U233Su67c1eYgBhrqWP24/edit#heading=h.khjybycus70) to execute Python user-defined functions which provides the basic building blocks required for PyFlink.
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/2022-04-24-pyflink-1.15-thread-mode/pyflink-process-mode.png"/>
+<br/>
+<i><small>Fig. 2 - PyFlink Runtime in Process Mode</small></i>
+</center>
+<br/>
+
+Process mode can be executed stably and efficiently in most scenarios. It is enough for more users. However, in some scenarios, it doesn’t work well due to the additional serialization/deserialization overhead.
+One of the most typical scenarios is image processing, where the input data size is often very big. Besides, since it involves inter-process communication, the processing latency is also non-negligible
+which is unacceptable in scenarios where latency is critical, e.g. quantitative transaction, etc. In order to overcome these problems, we have introduced a new execution mode(thread mode)
+where Python user-defined functions will be executed in the JVM as a thread instead of a separate Python process. In the following section, we will dig into the details of this new execution mode.
+
+## PEMJA
+
+Before digging into the thread mode, let’s introduce a library [PEMJA](https://github.com/alibaba/pemja) firstly, which is the core to the architecture of thread mode.
+
+As we all know, Java Native Interface (JNI) is a standard programming interface for writing Java native methods and embedding the Java virtual machine into native applications.
+What’s more, CPython provides Python/C API to help embed Python in C Applications.
+
+So if we combine these two interfaces together, we can embed Python in Java Application. Since this library solves a general problem that Python and Java could call each other,
+we have open sourced it as an independent project, and PyFlink has depended on [PEMJA](https://github.com/alibaba/pemja) since Flink 1.15 to support thread mode.
+
+### PEMJA Architecture
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/2022-04-24-pyflink-1.15-thread-mode/pemja.png"/>
+<br/>
+<i><small>Fig. 3 - PEMJA Architecture</small></i>
+</center>
+<br/>
+
+As we can see from the architecture of [PEMJA](https://github.com/alibaba/pemja) in Fig. 3, JVM and PVM can call each other in the same process through [PEMJA](https://github.com/alibaba/pemja) Library.
+
+Firstly, [PEMJA](https://github.com/alibaba/pemja) will start a daemon thread in JVM, which is responsible for initializing the Python Environment and creating a Python Main Interpreter owned by this process.
+The reason why [PEMJA](https://github.com/alibaba/pemja) uses a dedicated thread to initialize Python Environment is to avoid potential deadlocks in Python Interpreter.
+Python Interpreter could deadlock when trying to acquire the GIL through methods such as [PyGILState_*](https://docs.python.org/3/c-api/init.html#c.PyGILState_Ensure) in Python/C API concurrently.
+It should be noted that [PEMJA](https://github.com/alibaba/pemja) doesn’t call those methods directly, however, it may happen that third-party libraries may call them, e.g. [numpy](https://numpy.org/), etc.
+To get around this, we use a dedicated thread to initialize the Python Environment.  
+
+Then, each Java worker thread can invoke the Python functions through the Python [ThreadState](https://docs.python.org/3/c-api/init.html) created from Python Main Interpreter. 
+
+### Comparison with other solutions
+
+<table width="95%" border="1">
+  <thead>
+    <tr>
+      <th style="text-align: center">Framework</th>
+      <th style="text-align: center">Principle</th>
+      <th style="text-align: center">Limitations</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td style="text-align: center"><a href="https://www.jython.org/">Jython</a></td>
+      <td style="text-align: center">Python compiler implemented in Java</td>
+      <td style="text-align: justify">
+        <ul>
+          <li>Only support for Python2</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td style="text-align: center"><a href="https://www.graalvm.org/python/">GraalVM</a></td>
+      <td style="text-align: center">Truffle framework</td>
+      <td style="text-align: justify">
+        <ul>
+          <li>Compatibility issues with various Python ecological libraries</li>
+          <li>Works only with GraalVM</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td style="text-align: center"><a href="https://github.com/jpype-project/jpype">JPype</a></td>
+      <td style="text-align: center">JNI + Python/C API</td>
+      <td style="text-align: justify">
+        <ul>
+          <li>Don’t support Java calling Python</li>
+          <li>Only support for CPython</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td style="text-align: center"><a href="https://github.com/ninia/jep">Jep</a></td>
+      <td style="text-align: center">JNI + Python/C API</td>
+      <td style="text-align: justify">
+        <ul>
+          <li>Difficult to integrate</li>
+          <li>Performance is not good enough</li>
+          <li>Only support for CPython</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td style="text-align: center"><a href="https://github.com/alibaba/pemja">PEMJA</a></td>
+      <td style="text-align: center">JNI + Python/C API</td>
+      <td style="text-align: justify">
+        <ul>
+          <li>Only support for CPython</li>
+        </ul>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+
+In the table above, we have made a basic comparison of the popular solutions of Java/Python calling libraries.
+
+[Jython](https://www.jython.org/): Jython is a Python interpreter implemented in Java language. Because its implementation language is Java,
+the interoperability between code implemented by Python syntax and Java code will be very natural.
+However, Jython does not support Python 3 anymore, and it is no longer actively maintained.
+
+[GraalVM](https://www.graalvm.org/python/): GraalVM takes use of Truffle framework to support interoperability between Python and Java.
+However, it has the limitation that not all the Python libraries are supported. As we know, many Python libraries rely on standard CPython to implement their C extensions.
+The other problem is that it only works with GraalVM, which means high migration costs.
+
+[JPype](https://github.com/jpype-project/jpype): Similar to [PEMJA](https://github.com/alibaba/pemja),
+JPype is also a framework built using JNI and Python/C API, but JPype only supports calling Java from Python.
+
+[Jep](https://github.com/ninia/jep): Similar to [PEMJA](https://github.com/alibaba/pemja), Jep is also a framework built using JNI and Python/C API and it supports calling Python from Java.
+However, it doesn’t provide a jar to the maven repository and the process of loading native packages needs to be specified in advance through jvm parameters or environment variables when the JVM starts,
+which makes it difficult to integrate. Furthermore, our benchmark shows that the performance is not very good.
+
+[PEMJA](https://github.com/alibaba/pemja): Similar to Jep and JPype, PEMJA is built on CPython, so it cannot support other Python interpreters, such as PyPy, etc.
+Since CPython is the most used implementation and standard of Python Runtime officially provided by Python, most libraries of the Python ecology are built on CPython Runtime and so could work with PEMJA naturally.
+
+## Thread Mode
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/2022-04-24-pyflink-1.15-thread-mode/pyflink-thread-mode.png"/>
+<br/>
+<i><small>Fig. 4 - PyFlink Runtime in Thread Mode</small></i>
+</center>
+<br/>
+
+From the picture above, we can see that in thread mode, the Python user-defined function runs in the same process as the Python operator(which runs in JVM).
+[PEMJA](https://github.com/alibaba/pemja) is used as a bridge between the Java code and the Python code. 
+
+Since the Python user-defined function runs in JVM, for each input data received from the upstream operators, it will be passed to
+the Python user-defined function directly instead of buffered and passed to the Python user-defined function in a batch.
+Therefore, thread mode could have lower latency compared to the process mode. Currently, if users want to achieve lower latency in process mode, usually they need to configure the
+`python.fn-execution.bundle.size` or `python.fn-execution.bundle.time` to a lower value. However, since it involves inter-process communication,
+the latency is still a little high in some scenarios. However, this is not a problem any more in thread mode. Besides, configuring `python.fn-execution.bundle.size` or `python.fn-execution.bundle.time` to
+a lower value usually will affect the overall performance of the job and this will also not be a problem in thread mode.
+
+
+## Comparisons between process mode and thread mode
+
+<table width="95%" border="1">
+  <thead>
+    <tr>
+      <th style="text-align: center">Execution Mode</th>
+      <th style="text-align: center">Benefits</th>
+      <th style="text-align: center">Limitations</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td style="text-align: center">Process Mode</td>
+      <td style="text-align: justify">
+        <ul>
+          <li>Better resource isolation</li>
+        </ul>
+      </td>
+      <td style="text-align: justify">
+        <ul>
+          <li>IPC overhead</li>
+          <li>High implementation complexity</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td style="text-align: center">Thread Mode</td>
+      <td style="text-align: justify">
+        <ul>
+          <li>Higher throughput</li>
+          <li>Lower latency</li>
+          <li>Less checkpoint time</li>
+          <li>Less usage restrictions</li>
+        </ul>
+      </td>
+      <td style="text-align: justify">
+        <ul>
+          <li>Only support for CPython</li>
+          <li>Multiple jobs cannot use different Python interpreters in session mode</li>
+          <li>Performance is affected by the GIL</li>
+        </ul>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+`Benefits of thread mode`:
+
+Since it processes data in batches in process mode, currently Python user-defined functions could not be used in some scenarios,
+e.g. used in the Join(Table API & SQL) condition and taking columns both from the left table and the right table as inputs.
+However, this will not be a big problem any more in thread mode because of the nature that it handles the data one by one instead of a batch.
+
+Unlike process mode which sends and receives data asynchronously in batches, in thread mode, data will be processed synchronously one by one.
+So usually it will have lower latency and also less checkpoint time. In terms of performance, since there is no inter-process communication,
+it could avoid data serialization/deserialization and communication overhead, as well as the stage of copying and context switching between kernel space and user space,
+so it usually will have better performance in thread mode.
+
+`Limitations`:

Review Comment:
   ```suggestion
   ### Limitations
   ```



##########
_posts/2022-04-24-pyflink-1.15-thread-mode.md:
##########
@@ -0,0 +1,463 @@
+---
+layout: post
+title: "Exploring the thread mode in PyFlink"
+date: 2022-04-24T12:00:00.000Z
+authors:
+- Xingbo:
+  name: "Xingbo Huang"
+- Dian:
+  name: "Dian Fu"
+excerpt: Flink 1.15 introduced a new Runtime Execution Mode named 'thread' mode in PyFlink. This post explains how it works and when to use it.
+---
+
+PyFlink was introduced in Flink 1.9 which purpose is to bring the power of Flink to Python users and allow Python users to develop Flink jobs in Python language.
+The functionality becomes more and more mature through the development in the past releases.
+
+Before Flink 1.15, Python user-defined functions will be executed in separate Python processes (based on the [Apache Beam Portability Framework](https://docs.google.com/document/d/1B9NmaBSKCnMJQp-ibkxvZ_U233Su67c1eYgBhrqWP24/edit#heading=h.khjybycus70)).
+It will bring additional serialization/deserialization overhead and also communication overhead. In scenarios where the data size is big, e.g. image processing, etc,
+this overhead becomes non-negligible. Besides, since it involves inter-process communication, the processing latency is also non-negligible,
+which is unacceptable in scenarios where the latency is critical, e.g. quantitative transaction, etc.
+
+In Flink 1.15, we have introduced a new execution mode named 'thread' mode (based on [PEMJA](https://github.com/alibaba/pemja)) where the Python user-defined functions will be executed in the JVM as
+a thread instead of a separate Python process. In this article, we will dig into the details about this execution mode and also share some benchmark data to
+give users a basic understanding of how it works and which scenarios it’s applicable for.
+
+## Process Mode
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/2022-04-24-pyflink-1.15-thread-mode/pyflink-architecture-overview.png"/>
+<br/>
+<i><small>Fig. 1 - PyFlink Architecture Overview</small></i>
+</center>
+<br/>
+
+From Fig. 1, we can see the architecture of PyFlink. As shown on the left side of Fig.1, users could use PyFlink API(Python Table API & SQL or Python DataStream API) to declare the logic of jobs,
+which will be finally translated into JobGraph (DAG of the job) which could be recognized by Flink’s execution framework. It should be noted that Python operators (Flink operators whose purpose is to
+execute Python user-defined functions) will be used to execute the Python user-defined functions.
+
+On the right side of Fig. 1, it shows the details of the Python operators where the Python user-defined functions were executed in separate Python processes.
+
+In order to communicate with the Python worker process, a series of communication services are required between the Python operator(runs in JVM) and the Python worker(runs in Python VM).
+PyFlink has employed [Apache Beam Portability framework](https://docs.google.com/document/d/1B9NmaBSKCnMJQp-ibkxvZ_U233Su67c1eYgBhrqWP24/edit#heading=h.khjybycus70) to execute Python user-defined functions which provides the basic building blocks required for PyFlink.
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/2022-04-24-pyflink-1.15-thread-mode/pyflink-process-mode.png"/>
+<br/>
+<i><small>Fig. 2 - PyFlink Runtime in Process Mode</small></i>
+</center>
+<br/>
+
+Process mode can be executed stably and efficiently in most scenarios. It is enough for more users. However, in some scenarios, it doesn’t work well due to the additional serialization/deserialization overhead.
+One of the most typical scenarios is image processing, where the input data size is often very big. Besides, since it involves inter-process communication, the processing latency is also non-negligible
+which is unacceptable in scenarios where latency is critical, e.g. quantitative transaction, etc. In order to overcome these problems, we have introduced a new execution mode(thread mode)
+where Python user-defined functions will be executed in the JVM as a thread instead of a separate Python process. In the following section, we will dig into the details of this new execution mode.
+
+## PEMJA
+
+Before digging into the thread mode, let’s introduce a library [PEMJA](https://github.com/alibaba/pemja) firstly, which is the core to the architecture of thread mode.
+
+As we all know, Java Native Interface (JNI) is a standard programming interface for writing Java native methods and embedding the Java virtual machine into native applications.
+What’s more, CPython provides Python/C API to help embed Python in C Applications.
+
+So if we combine these two interfaces together, we can embed Python in Java Application. Since this library solves a general problem that Python and Java could call each other,
+we have open sourced it as an independent project, and PyFlink has depended on [PEMJA](https://github.com/alibaba/pemja) since Flink 1.15 to support thread mode.
+
+### PEMJA Architecture
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/2022-04-24-pyflink-1.15-thread-mode/pemja.png"/>
+<br/>
+<i><small>Fig. 3 - PEMJA Architecture</small></i>
+</center>
+<br/>
+
+As we can see from the architecture of [PEMJA](https://github.com/alibaba/pemja) in Fig. 3, JVM and PVM can call each other in the same process through [PEMJA](https://github.com/alibaba/pemja) Library.
+
+Firstly, [PEMJA](https://github.com/alibaba/pemja) will start a daemon thread in JVM, which is responsible for initializing the Python Environment and creating a Python Main Interpreter owned by this process.
+The reason why [PEMJA](https://github.com/alibaba/pemja) uses a dedicated thread to initialize Python Environment is to avoid potential deadlocks in Python Interpreter.
+Python Interpreter could deadlock when trying to acquire the GIL through methods such as [PyGILState_*](https://docs.python.org/3/c-api/init.html#c.PyGILState_Ensure) in Python/C API concurrently.
+It should be noted that [PEMJA](https://github.com/alibaba/pemja) doesn’t call those methods directly, however, it may happen that third-party libraries may call them, e.g. [numpy](https://numpy.org/), etc.
+To get around this, we use a dedicated thread to initialize the Python Environment.  
+
+Then, each Java worker thread can invoke the Python functions through the Python [ThreadState](https://docs.python.org/3/c-api/init.html) created from Python Main Interpreter. 
+
+### Comparison with other solutions
+
+<table width="95%" border="1">
+  <thead>
+    <tr>
+      <th style="text-align: center">Framework</th>
+      <th style="text-align: center">Principle</th>
+      <th style="text-align: center">Limitations</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td style="text-align: center"><a href="https://www.jython.org/">Jython</a></td>
+      <td style="text-align: center">Python compiler implemented in Java</td>
+      <td style="text-align: justify">
+        <ul>
+          <li>Only support for Python2</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td style="text-align: center"><a href="https://www.graalvm.org/python/">GraalVM</a></td>
+      <td style="text-align: center">Truffle framework</td>
+      <td style="text-align: justify">
+        <ul>
+          <li>Compatibility issues with various Python ecological libraries</li>
+          <li>Works only with GraalVM</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td style="text-align: center"><a href="https://github.com/jpype-project/jpype">JPype</a></td>
+      <td style="text-align: center">JNI + Python/C API</td>
+      <td style="text-align: justify">
+        <ul>
+          <li>Don’t support Java calling Python</li>
+          <li>Only support for CPython</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td style="text-align: center"><a href="https://github.com/ninia/jep">Jep</a></td>
+      <td style="text-align: center">JNI + Python/C API</td>
+      <td style="text-align: justify">
+        <ul>
+          <li>Difficult to integrate</li>
+          <li>Performance is not good enough</li>
+          <li>Only support for CPython</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td style="text-align: center"><a href="https://github.com/alibaba/pemja">PEMJA</a></td>
+      <td style="text-align: center">JNI + Python/C API</td>
+      <td style="text-align: justify">
+        <ul>
+          <li>Only support for CPython</li>
+        </ul>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+
+In the table above, we have made a basic comparison of the popular solutions of Java/Python calling libraries.
+
+[Jython](https://www.jython.org/): Jython is a Python interpreter implemented in Java language. Because its implementation language is Java,
+the interoperability between code implemented by Python syntax and Java code will be very natural.
+However, Jython does not support Python 3 anymore, and it is no longer actively maintained.
+
+[GraalVM](https://www.graalvm.org/python/): GraalVM takes use of Truffle framework to support interoperability between Python and Java.
+However, it has the limitation that not all the Python libraries are supported. As we know, many Python libraries rely on standard CPython to implement their C extensions.
+The other problem is that it only works with GraalVM, which means high migration costs.
+
+[JPype](https://github.com/jpype-project/jpype): Similar to [PEMJA](https://github.com/alibaba/pemja),
+JPype is also a framework built using JNI and Python/C API, but JPype only supports calling Java from Python.
+
+[Jep](https://github.com/ninia/jep): Similar to [PEMJA](https://github.com/alibaba/pemja), Jep is also a framework built using JNI and Python/C API and it supports calling Python from Java.
+However, it doesn’t provide a jar to the maven repository and the process of loading native packages needs to be specified in advance through jvm parameters or environment variables when the JVM starts,
+which makes it difficult to integrate. Furthermore, our benchmark shows that the performance is not very good.
+
+[PEMJA](https://github.com/alibaba/pemja): Similar to Jep and JPype, PEMJA is built on CPython, so it cannot support other Python interpreters, such as PyPy, etc.
+Since CPython is the most used implementation and standard of Python Runtime officially provided by Python, most libraries of the Python ecology are built on CPython Runtime and so could work with PEMJA naturally.
+
+## Thread Mode
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/2022-04-24-pyflink-1.15-thread-mode/pyflink-thread-mode.png"/>
+<br/>
+<i><small>Fig. 4 - PyFlink Runtime in Thread Mode</small></i>
+</center>
+<br/>
+
+From the picture above, we can see that in thread mode, the Python user-defined function runs in the same process as the Python operator(which runs in JVM).
+[PEMJA](https://github.com/alibaba/pemja) is used as a bridge between the Java code and the Python code. 
+
+Since the Python user-defined function runs in JVM, for each input data received from the upstream operators, it will be passed to
+the Python user-defined function directly instead of buffered and passed to the Python user-defined function in a batch.
+Therefore, thread mode could have lower latency compared to the process mode. Currently, if users want to achieve lower latency in process mode, usually they need to configure the
+`python.fn-execution.bundle.size` or `python.fn-execution.bundle.time` to a lower value. However, since it involves inter-process communication,
+the latency is still a little high in some scenarios. However, this is not a problem any more in thread mode. Besides, configuring `python.fn-execution.bundle.size` or `python.fn-execution.bundle.time` to
+a lower value usually will affect the overall performance of the job and this will also not be a problem in thread mode.
+
+
+## Comparisons between process mode and thread mode
+
+<table width="95%" border="1">
+  <thead>
+    <tr>
+      <th style="text-align: center">Execution Mode</th>
+      <th style="text-align: center">Benefits</th>
+      <th style="text-align: center">Limitations</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td style="text-align: center">Process Mode</td>
+      <td style="text-align: justify">
+        <ul>
+          <li>Better resource isolation</li>
+        </ul>
+      </td>
+      <td style="text-align: justify">
+        <ul>
+          <li>IPC overhead</li>
+          <li>High implementation complexity</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td style="text-align: center">Thread Mode</td>
+      <td style="text-align: justify">
+        <ul>
+          <li>Higher throughput</li>
+          <li>Lower latency</li>
+          <li>Less checkpoint time</li>
+          <li>Less usage restrictions</li>
+        </ul>
+      </td>
+      <td style="text-align: justify">
+        <ul>
+          <li>Only support for CPython</li>
+          <li>Multiple jobs cannot use different Python interpreters in session mode</li>
+          <li>Performance is affected by the GIL</li>
+        </ul>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+`Benefits of thread mode`:
+
+Since it processes data in batches in process mode, currently Python user-defined functions could not be used in some scenarios,
+e.g. used in the Join(Table API & SQL) condition and taking columns both from the left table and the right table as inputs.
+However, this will not be a big problem any more in thread mode because of the nature that it handles the data one by one instead of a batch.
+
+Unlike process mode which sends and receives data asynchronously in batches, in thread mode, data will be processed synchronously one by one.
+So usually it will have lower latency and also less checkpoint time. In terms of performance, since there is no inter-process communication,
+it could avoid data serialization/deserialization and communication overhead, as well as the stage of copying and context switching between kernel space and user space,
+so it usually will have better performance in thread mode.
+
+`Limitations`:
+
+However, there are also some limitations for thread mode:
+
+ - It only supports CPython which is also one of the most used Python interpreters.
+ - It doesn’t support session mode well and so it’s recommended that users only use thread mode in per-job or application deployments.
+The reason is it doesn’t support using different Python interpreters for the jobs running in the same TaskManager.
+This limitation comes from the fact that many Python libraries assume that they will only be initialized once in the process, so they use a lot of static variables.
+
+## Usage
+
+The execution mode could be configured via the configuration `python.execution-mode`. It has two possible values:
+
+ - `process`: The Python user-defined functions will be executed in a separate Python process. (default)
+ - `thread`: The Python user-defined functions will be executed in the same process as Java operators.
+
+```python
+# Specify `process` mode
+table_env.get_config().set("python.execution-mode", "process")
+
+# Specify `thread` mode
+table_env.get_config().set("python.execution-mode", "thread")
+```
+
+It should be noted that since this is still the first release of 'thread' mode, currently there are still many limitations about it,
+e.g. it only supports Python ScalarFunction of Python Table API & SQL. It will fall back to 'process' mode where 'thread' mode is not supported.
+So it may happen that you configure a job to execute in thread mode, however, it’s actually executed in 'process' execution mode.
+
+## [Benchmark](https://github.com/HuangXingBo/pyflink-benchmark)
+
+### Test environment
+
+OS: Alibaba Cloud Linux (Aliyun Linux) release 2.1903 LTS (Hunting Beagle)
+
+CPU: Intel(R) Xeon(R) Platinum 8269CY CPU @ 2.50GHz
+
+Memory: 16G
+
+CPython: Python 3.7.3
+
+JDK: OpenJDK Runtime Environment (build 1.8.0_292-b10)
+
+PyFlink: 1.15.0
+
+### Test results
+
+Here, we test the json processing which is a very common scenario for PyFlink users.
+
+The UDF implementation is as following:
+
+```python
+# python udf
+@udf(result_type=DataTypes.STRING(), func_type="general")
+def json_value_lower(s: str):
+    import json
+    a = json.loads(s)
+    a['a'] = a['a'].lower()
+    return json.dumps(a)
+
+```
+
+```java
+// Java UDF
+public class JsonValueLower extends ScalarFunction {
+    private transient ObjectMapper mapper;
+    private transient ObjectWriter writer;
+
+    @Override
+    public void open(FunctionContext context) throws Exception {
+        this.mapper = new ObjectMapper();
+        this.writer = mapper.writerWithDefaultPrettyPrinter();
+    }
+
+    public String eval(String s) {
+        try {
+            StringObject object = mapper.readValue(s, StringObject.class);
+            object.setA(object.a.toLowerCase());
+            return writer.writeValueAsString(object);
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException("Failed to read json value", e);
+        }
+    }
+
+    private static class StringObject {
+        private String a;
+
+        public String getA() {
+            return a;
+        }
+
+        public void setA(String a) {
+            this.a = a;
+        }
+
+        @Override
+        public String toString() {
+            return "StringObject{" + "a='" + a + '\'' + '}';
+        }
+    }
+}
+```
+
+The test results is as following:
+
+<table width="95%" border="1">
+  <thead>
+    <tr>
+      <th style="text-align: center">Type (input data size)</th>
+      <th style="text-align: center">QPS</th>
+      <th style="text-align: center">Latency</th>
+      <th style="text-align: center">Checkpoint Time</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td style="text-align: center">Java UDF (100k)</td>
+      <td style="text-align: center">900</td>
+      <td style="text-align: center">2ms</td>
+      <td style="text-align: center">100ms</td>
+    </tr>
+    <tr>
+      <td style="text-align: center">Java UDF (10k)</td>
+      <td style="text-align: center">1w</td>
+      <td style="text-align: center">20us</td>
+      <td style="text-align: center">10ms</td>
+    </tr>
+    <tr>
+      <td style="text-align: center">Java UDF (1k)</td>
+      <td style="text-align: center">5w</td>
+      <td style="text-align: center">1us</td>
+      <td style="text-align: center">10ms</td>
+    </tr>
+    <tr>
+      <td style="text-align: center">Java UDF (100)</td>
+      <td style="text-align: center">28w</td>
+      <td style="text-align: center">200ns</td>
+      <td style="text-align: center">10ms</td>
+    </tr>
+    <tr>
+      <td style="text-align: center">Process Mode (100k)</td>
+      <td style="text-align: center">900</td>
+      <td style="text-align: center">5s-10s</td>
+      <td style="text-align: center">5s</td>
+    </tr>
+    <tr>
+      <td style="text-align: center">Process Mode (10k)</td>
+      <td style="text-align: center">7000</td>
+      <td style="text-align: center">5s-10s</td>
+      <td style="text-align: center">3s</td>
+    </tr>
+    <tr>
+      <td style="text-align: center">Process Mode (1k)</td>
+      <td style="text-align: center">3.6w</td>
+      <td style="text-align: center">3s</td>
+      <td style="text-align: center">3s</td>
+    </tr>
+    <tr>
+      <td style="text-align: center">Process Mode (100)</td>
+      <td style="text-align: center">12w</td>
+      <td style="text-align: center">2s</td>
+      <td style="text-align: center">2s</td>
+    </tr>
+    <tr>
+      <td style="text-align: center">Thread Mode (100k)</td>
+      <td style="text-align: center">1200</td>
+      <td style="text-align: center">1ms</td>
+      <td style="text-align: center">100ms</td>
+    </tr>
+    <tr>
+      <td style="text-align: center">Thread Mode (10k)</td>
+      <td style="text-align: center">1.2w</td>
+      <td style="text-align: center">20us</td>
+      <td style="text-align: center">10ms</td>
+    </tr>
+    <tr>
+      <td style="text-align: center">Thread Mode (1k)</td>
+      <td style="text-align: center">5w</td>
+      <td style="text-align: center">3us</td>
+      <td style="text-align: center">10ms</td>
+    </tr>
+    <tr>
+      <td style="text-align: center">Thread Mode (100)</td>
+      <td style="text-align: center">12w</td>
+      <td style="text-align: center">1us</td>
+      <td style="text-align: center">10ms</td>
+    </tr>
+  </tbody>
+</table>
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/2022-04-24-pyflink-1.15-thread-mode/pyflink-performance.jpg"/>
+</center>
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/2022-04-24-pyflink-1.15-thread-mode/pyflink-latency.jpg"/>
+</center>
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/2022-04-24-pyflink-1.15-thread-mode/pyflink-checkpoint-time.jpg"/>
+</center>
+
+As we can see from the test results:
+
+ * If you care about latency and checkpoint time, thread mode is your better choice than process mode. The processing latency is decreased from several seconds in process mode to microseconds in thread mode. 
+
+ * Thread mode can bring better performance than process mode when data serialization/deserialization is not negligible relative to UDF calculation itself.
+Compared to process mode, benchmark has shown that the throughput could be increased by 2x in common scenarios such as json processing in thread mode.
+​​On the contrary, it is more recommended to use process mode, because different processes have better resource isolation.

Review Comment:
   ```suggestion
   However, if the UDF calculation is slow and spends much longer time, then it is more recommended to use process mode, because the process mode is more mature and it has better resource isolation.
   ```



##########
_posts/2022-04-24-pyflink-1.15-thread-mode.md:
##########
@@ -0,0 +1,463 @@
+---
+layout: post
+title: "Exploring the thread mode in PyFlink"
+date: 2022-04-24T12:00:00.000Z
+authors:
+- Xingbo:
+  name: "Xingbo Huang"
+- Dian:
+  name: "Dian Fu"
+excerpt: Flink 1.15 introduced a new Runtime Execution Mode named 'thread' mode in PyFlink. This post explains how it works and when to use it.
+---
+
+PyFlink was introduced in Flink 1.9 which purpose is to bring the power of Flink to Python users and allow Python users to develop Flink jobs in Python language.
+The functionality becomes more and more mature through the development in the past releases.
+
+Before Flink 1.15, Python user-defined functions will be executed in separate Python processes (based on the [Apache Beam Portability Framework](https://docs.google.com/document/d/1B9NmaBSKCnMJQp-ibkxvZ_U233Su67c1eYgBhrqWP24/edit#heading=h.khjybycus70)).
+It will bring additional serialization/deserialization overhead and also communication overhead. In scenarios where the data size is big, e.g. image processing, etc,
+this overhead becomes non-negligible. Besides, since it involves inter-process communication, the processing latency is also non-negligible,
+which is unacceptable in scenarios where the latency is critical, e.g. quantitative transaction, etc.
+
+In Flink 1.15, we have introduced a new execution mode named 'thread' mode (based on [PEMJA](https://github.com/alibaba/pemja)) where the Python user-defined functions will be executed in the JVM as
+a thread instead of a separate Python process. In this article, we will dig into the details about this execution mode and also share some benchmark data to
+give users a basic understanding of how it works and which scenarios it’s applicable for.
+
+## Process Mode
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/2022-04-24-pyflink-1.15-thread-mode/pyflink-architecture-overview.png"/>
+<br/>
+<i><small>Fig. 1 - PyFlink Architecture Overview</small></i>
+</center>
+<br/>
+
+From Fig. 1, we can see the architecture of PyFlink. As shown on the left side of Fig.1, users could use PyFlink API(Python Table API & SQL or Python DataStream API) to declare the logic of jobs,
+which will be finally translated into JobGraph (DAG of the job) which could be recognized by Flink’s execution framework. It should be noted that Python operators (Flink operators whose purpose is to
+execute Python user-defined functions) will be used to execute the Python user-defined functions.
+
+On the right side of Fig. 1, it shows the details of the Python operators where the Python user-defined functions were executed in separate Python processes.
+
+In order to communicate with the Python worker process, a series of communication services are required between the Python operator(runs in JVM) and the Python worker(runs in Python VM).
+PyFlink has employed [Apache Beam Portability framework](https://docs.google.com/document/d/1B9NmaBSKCnMJQp-ibkxvZ_U233Su67c1eYgBhrqWP24/edit#heading=h.khjybycus70) to execute Python user-defined functions which provides the basic building blocks required for PyFlink.
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/2022-04-24-pyflink-1.15-thread-mode/pyflink-process-mode.png"/>
+<br/>
+<i><small>Fig. 2 - PyFlink Runtime in Process Mode</small></i>
+</center>
+<br/>
+
+Process mode can be executed stably and efficiently in most scenarios. It is enough for more users. However, in some scenarios, it doesn’t work well due to the additional serialization/deserialization overhead.
+One of the most typical scenarios is image processing, where the input data size is often very big. Besides, since it involves inter-process communication, the processing latency is also non-negligible
+which is unacceptable in scenarios where latency is critical, e.g. quantitative transaction, etc. In order to overcome these problems, we have introduced a new execution mode(thread mode)
+where Python user-defined functions will be executed in the JVM as a thread instead of a separate Python process. In the following section, we will dig into the details of this new execution mode.
+
+## PEMJA
+
+Before digging into the thread mode, let’s introduce a library [PEMJA](https://github.com/alibaba/pemja) firstly, which is the core to the architecture of thread mode.
+
+As we all know, Java Native Interface (JNI) is a standard programming interface for writing Java native methods and embedding the Java virtual machine into native applications.
+What’s more, CPython provides Python/C API to help embed Python in C Applications.
+
+So if we combine these two interfaces together, we can embed Python in Java Application. Since this library solves a general problem that Python and Java could call each other,
+we have open sourced it as an independent project, and PyFlink has depended on [PEMJA](https://github.com/alibaba/pemja) since Flink 1.15 to support thread mode.
+
+### PEMJA Architecture
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/2022-04-24-pyflink-1.15-thread-mode/pemja.png"/>
+<br/>
+<i><small>Fig. 3 - PEMJA Architecture</small></i>
+</center>
+<br/>
+
+As we can see from the architecture of [PEMJA](https://github.com/alibaba/pemja) in Fig. 3, JVM and PVM can call each other in the same process through [PEMJA](https://github.com/alibaba/pemja) Library.
+
+Firstly, [PEMJA](https://github.com/alibaba/pemja) will start a daemon thread in JVM, which is responsible for initializing the Python Environment and creating a Python Main Interpreter owned by this process.
+The reason why [PEMJA](https://github.com/alibaba/pemja) uses a dedicated thread to initialize Python Environment is to avoid potential deadlocks in Python Interpreter.
+Python Interpreter could deadlock when trying to acquire the GIL through methods such as [PyGILState_*](https://docs.python.org/3/c-api/init.html#c.PyGILState_Ensure) in Python/C API concurrently.
+It should be noted that [PEMJA](https://github.com/alibaba/pemja) doesn’t call those methods directly, however, it may happen that third-party libraries may call them, e.g. [numpy](https://numpy.org/), etc.
+To get around this, we use a dedicated thread to initialize the Python Environment.  
+
+Then, each Java worker thread can invoke the Python functions through the Python [ThreadState](https://docs.python.org/3/c-api/init.html) created from Python Main Interpreter. 
+
+### Comparison with other solutions
+
+<table width="95%" border="1">
+  <thead>
+    <tr>
+      <th style="text-align: center">Framework</th>
+      <th style="text-align: center">Principle</th>
+      <th style="text-align: center">Limitations</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td style="text-align: center"><a href="https://www.jython.org/">Jython</a></td>
+      <td style="text-align: center">Python compiler implemented in Java</td>
+      <td style="text-align: justify">
+        <ul>
+          <li>Only support for Python2</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td style="text-align: center"><a href="https://www.graalvm.org/python/">GraalVM</a></td>
+      <td style="text-align: center">Truffle framework</td>
+      <td style="text-align: justify">
+        <ul>
+          <li>Compatibility issues with various Python ecological libraries</li>
+          <li>Works only with GraalVM</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td style="text-align: center"><a href="https://github.com/jpype-project/jpype">JPype</a></td>
+      <td style="text-align: center">JNI + Python/C API</td>
+      <td style="text-align: justify">
+        <ul>
+          <li>Don’t support Java calling Python</li>
+          <li>Only support for CPython</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td style="text-align: center"><a href="https://github.com/ninia/jep">Jep</a></td>
+      <td style="text-align: center">JNI + Python/C API</td>
+      <td style="text-align: justify">
+        <ul>
+          <li>Difficult to integrate</li>
+          <li>Performance is not good enough</li>
+          <li>Only support for CPython</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td style="text-align: center"><a href="https://github.com/alibaba/pemja">PEMJA</a></td>
+      <td style="text-align: center">JNI + Python/C API</td>
+      <td style="text-align: justify">
+        <ul>
+          <li>Only support for CPython</li>
+        </ul>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+
+In the table above, we have made a basic comparison of the popular solutions of Java/Python calling libraries.
+
+[Jython](https://www.jython.org/): Jython is a Python interpreter implemented in Java language. Because its implementation language is Java,
+the interoperability between code implemented by Python syntax and Java code will be very natural.
+However, Jython does not support Python 3 anymore, and it is no longer actively maintained.
+
+[GraalVM](https://www.graalvm.org/python/): GraalVM takes use of Truffle framework to support interoperability between Python and Java.
+However, it has the limitation that not all the Python libraries are supported. As we know, many Python libraries rely on standard CPython to implement their C extensions.
+The other problem is that it only works with GraalVM, which means high migration costs.
+
+[JPype](https://github.com/jpype-project/jpype): Similar to [PEMJA](https://github.com/alibaba/pemja),
+JPype is also a framework built using JNI and Python/C API, but JPype only supports calling Java from Python.
+
+[Jep](https://github.com/ninia/jep): Similar to [PEMJA](https://github.com/alibaba/pemja), Jep is also a framework built using JNI and Python/C API and it supports calling Python from Java.
+However, it doesn’t provide a jar to the maven repository and the process of loading native packages needs to be specified in advance through jvm parameters or environment variables when the JVM starts,
+which makes it difficult to integrate. Furthermore, our benchmark shows that the performance is not very good.
+
+[PEMJA](https://github.com/alibaba/pemja): Similar to Jep and JPype, PEMJA is built on CPython, so it cannot support other Python interpreters, such as PyPy, etc.
+Since CPython is the most used implementation and standard of Python Runtime officially provided by Python, most libraries of the Python ecology are built on CPython Runtime and so could work with PEMJA naturally.
+
+## Thread Mode
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/2022-04-24-pyflink-1.15-thread-mode/pyflink-thread-mode.png"/>
+<br/>
+<i><small>Fig. 4 - PyFlink Runtime in Thread Mode</small></i>
+</center>
+<br/>
+
+From the picture above, we can see that in thread mode, the Python user-defined function runs in the same process as the Python operator(which runs in JVM).
+[PEMJA](https://github.com/alibaba/pemja) is used as a bridge between the Java code and the Python code. 
+
+Since the Python user-defined function runs in JVM, for each input data received from the upstream operators, it will be passed to
+the Python user-defined function directly instead of buffered and passed to the Python user-defined function in a batch.
+Therefore, thread mode could have lower latency compared to the process mode. Currently, if users want to achieve lower latency in process mode, usually they need to configure the
+`python.fn-execution.bundle.size` or `python.fn-execution.bundle.time` to a lower value. However, since it involves inter-process communication,
+the latency is still a little high in some scenarios. However, this is not a problem any more in thread mode. Besides, configuring `python.fn-execution.bundle.size` or `python.fn-execution.bundle.time` to
+a lower value usually will affect the overall performance of the job and this will also not be a problem in thread mode.
+
+
+## Comparisons between process mode and thread mode
+
+<table width="95%" border="1">
+  <thead>
+    <tr>
+      <th style="text-align: center">Execution Mode</th>
+      <th style="text-align: center">Benefits</th>
+      <th style="text-align: center">Limitations</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td style="text-align: center">Process Mode</td>
+      <td style="text-align: justify">
+        <ul>
+          <li>Better resource isolation</li>
+        </ul>
+      </td>
+      <td style="text-align: justify">
+        <ul>
+          <li>IPC overhead</li>
+          <li>High implementation complexity</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td style="text-align: center">Thread Mode</td>
+      <td style="text-align: justify">
+        <ul>
+          <li>Higher throughput</li>
+          <li>Lower latency</li>
+          <li>Less checkpoint time</li>
+          <li>Less usage restrictions</li>
+        </ul>
+      </td>
+      <td style="text-align: justify">
+        <ul>
+          <li>Only support for CPython</li>
+          <li>Multiple jobs cannot use different Python interpreters in session mode</li>
+          <li>Performance is affected by the GIL</li>
+        </ul>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+`Benefits of thread mode`:
+
+Since it processes data in batches in process mode, currently Python user-defined functions could not be used in some scenarios,
+e.g. used in the Join(Table API & SQL) condition and taking columns both from the left table and the right table as inputs.
+However, this will not be a big problem any more in thread mode because of the nature that it handles the data one by one instead of a batch.
+
+Unlike process mode which sends and receives data asynchronously in batches, in thread mode, data will be processed synchronously one by one.
+So usually it will have lower latency and also less checkpoint time. In terms of performance, since there is no inter-process communication,
+it could avoid data serialization/deserialization and communication overhead, as well as the stage of copying and context switching between kernel space and user space,
+so it usually will have better performance in thread mode.
+
+`Limitations`:
+
+However, there are also some limitations for thread mode:
+
+ - It only supports CPython which is also one of the most used Python interpreters.
+ - It doesn’t support session mode well and so it’s recommended that users only use thread mode in per-job or application deployments.
+The reason is it doesn’t support using different Python interpreters for the jobs running in the same TaskManager.
+This limitation comes from the fact that many Python libraries assume that they will only be initialized once in the process, so they use a lot of static variables.
+
+## Usage
+
+The execution mode could be configured via the configuration `python.execution-mode`. It has two possible values:
+
+ - `process`: The Python user-defined functions will be executed in a separate Python process. (default)
+ - `thread`: The Python user-defined functions will be executed in the same process as Java operators.
+

Review Comment:
   For example, you could configure it as following in Python Table API:



##########
_posts/2022-04-24-pyflink-1.15-thread-mode.md:
##########
@@ -0,0 +1,463 @@
+---
+layout: post
+title: "Exploring the thread mode in PyFlink"
+date: 2022-04-24T12:00:00.000Z
+authors:
+- Xingbo:
+  name: "Xingbo Huang"
+- Dian:
+  name: "Dian Fu"
+excerpt: Flink 1.15 introduced a new Runtime Execution Mode named 'thread' mode in PyFlink. This post explains how it works and when to use it.
+---
+
+PyFlink was introduced in Flink 1.9 which purpose is to bring the power of Flink to Python users and allow Python users to develop Flink jobs in Python language.
+The functionality becomes more and more mature through the development in the past releases.
+
+Before Flink 1.15, Python user-defined functions will be executed in separate Python processes (based on the [Apache Beam Portability Framework](https://docs.google.com/document/d/1B9NmaBSKCnMJQp-ibkxvZ_U233Su67c1eYgBhrqWP24/edit#heading=h.khjybycus70)).
+It will bring additional serialization/deserialization overhead and also communication overhead. In scenarios where the data size is big, e.g. image processing, etc,
+this overhead becomes non-negligible. Besides, since it involves inter-process communication, the processing latency is also non-negligible,
+which is unacceptable in scenarios where the latency is critical, e.g. quantitative transaction, etc.
+
+In Flink 1.15, we have introduced a new execution mode named 'thread' mode (based on [PEMJA](https://github.com/alibaba/pemja)) where the Python user-defined functions will be executed in the JVM as
+a thread instead of a separate Python process. In this article, we will dig into the details about this execution mode and also share some benchmark data to
+give users a basic understanding of how it works and which scenarios it’s applicable for.
+
+## Process Mode
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/2022-04-24-pyflink-1.15-thread-mode/pyflink-architecture-overview.png"/>
+<br/>
+<i><small>Fig. 1 - PyFlink Architecture Overview</small></i>
+</center>
+<br/>
+
+From Fig. 1, we can see the architecture of PyFlink. As shown on the left side of Fig.1, users could use PyFlink API(Python Table API & SQL or Python DataStream API) to declare the logic of jobs,
+which will be finally translated into JobGraph (DAG of the job) which could be recognized by Flink’s execution framework. It should be noted that Python operators (Flink operators whose purpose is to
+execute Python user-defined functions) will be used to execute the Python user-defined functions.
+
+On the right side of Fig. 1, it shows the details of the Python operators where the Python user-defined functions were executed in separate Python processes.
+
+In order to communicate with the Python worker process, a series of communication services are required between the Python operator(runs in JVM) and the Python worker(runs in Python VM).
+PyFlink has employed [Apache Beam Portability framework](https://docs.google.com/document/d/1B9NmaBSKCnMJQp-ibkxvZ_U233Su67c1eYgBhrqWP24/edit#heading=h.khjybycus70) to execute Python user-defined functions which provides the basic building blocks required for PyFlink.
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/2022-04-24-pyflink-1.15-thread-mode/pyflink-process-mode.png"/>
+<br/>
+<i><small>Fig. 2 - PyFlink Runtime in Process Mode</small></i>
+</center>
+<br/>
+
+Process mode can be executed stably and efficiently in most scenarios. It is enough for more users. However, in some scenarios, it doesn’t work well due to the additional serialization/deserialization overhead.
+One of the most typical scenarios is image processing, where the input data size is often very big. Besides, since it involves inter-process communication, the processing latency is also non-negligible
+which is unacceptable in scenarios where latency is critical, e.g. quantitative transaction, etc. In order to overcome these problems, we have introduced a new execution mode(thread mode)
+where Python user-defined functions will be executed in the JVM as a thread instead of a separate Python process. In the following section, we will dig into the details of this new execution mode.
+
+## PEMJA
+
+Before digging into the thread mode, let’s introduce a library [PEMJA](https://github.com/alibaba/pemja) firstly, which is the core to the architecture of thread mode.
+
+As we all know, Java Native Interface (JNI) is a standard programming interface for writing Java native methods and embedding the Java virtual machine into native applications.
+What’s more, CPython provides Python/C API to help embed Python in C Applications.
+
+So if we combine these two interfaces together, we can embed Python in Java Application. Since this library solves a general problem that Python and Java could call each other,
+we have open sourced it as an independent project, and PyFlink has depended on [PEMJA](https://github.com/alibaba/pemja) since Flink 1.15 to support thread mode.
+
+### PEMJA Architecture
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/2022-04-24-pyflink-1.15-thread-mode/pemja.png"/>
+<br/>
+<i><small>Fig. 3 - PEMJA Architecture</small></i>
+</center>
+<br/>
+
+As we can see from the architecture of [PEMJA](https://github.com/alibaba/pemja) in Fig. 3, JVM and PVM can call each other in the same process through [PEMJA](https://github.com/alibaba/pemja) Library.
+
+Firstly, [PEMJA](https://github.com/alibaba/pemja) will start a daemon thread in JVM, which is responsible for initializing the Python Environment and creating a Python Main Interpreter owned by this process.
+The reason why [PEMJA](https://github.com/alibaba/pemja) uses a dedicated thread to initialize Python Environment is to avoid potential deadlocks in Python Interpreter.
+Python Interpreter could deadlock when trying to acquire the GIL through methods such as [PyGILState_*](https://docs.python.org/3/c-api/init.html#c.PyGILState_Ensure) in Python/C API concurrently.
+It should be noted that [PEMJA](https://github.com/alibaba/pemja) doesn’t call those methods directly, however, it may happen that third-party libraries may call them, e.g. [numpy](https://numpy.org/), etc.
+To get around this, we use a dedicated thread to initialize the Python Environment.  
+
+Then, each Java worker thread can invoke the Python functions through the Python [ThreadState](https://docs.python.org/3/c-api/init.html) created from Python Main Interpreter. 
+
+### Comparison with other solutions
+
+<table width="95%" border="1">
+  <thead>
+    <tr>
+      <th style="text-align: center">Framework</th>
+      <th style="text-align: center">Principle</th>
+      <th style="text-align: center">Limitations</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td style="text-align: center"><a href="https://www.jython.org/">Jython</a></td>
+      <td style="text-align: center">Python compiler implemented in Java</td>
+      <td style="text-align: justify">
+        <ul>
+          <li>Only support for Python2</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td style="text-align: center"><a href="https://www.graalvm.org/python/">GraalVM</a></td>
+      <td style="text-align: center">Truffle framework</td>
+      <td style="text-align: justify">
+        <ul>
+          <li>Compatibility issues with various Python ecological libraries</li>
+          <li>Works only with GraalVM</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td style="text-align: center"><a href="https://github.com/jpype-project/jpype">JPype</a></td>
+      <td style="text-align: center">JNI + Python/C API</td>
+      <td style="text-align: justify">
+        <ul>
+          <li>Don’t support Java calling Python</li>
+          <li>Only support for CPython</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td style="text-align: center"><a href="https://github.com/ninia/jep">Jep</a></td>
+      <td style="text-align: center">JNI + Python/C API</td>
+      <td style="text-align: justify">
+        <ul>
+          <li>Difficult to integrate</li>
+          <li>Performance is not good enough</li>
+          <li>Only support for CPython</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td style="text-align: center"><a href="https://github.com/alibaba/pemja">PEMJA</a></td>
+      <td style="text-align: center">JNI + Python/C API</td>
+      <td style="text-align: justify">
+        <ul>
+          <li>Only support for CPython</li>
+        </ul>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+
+In the table above, we have made a basic comparison of the popular solutions of Java/Python calling libraries.
+
+[Jython](https://www.jython.org/): Jython is a Python interpreter implemented in Java language. Because its implementation language is Java,
+the interoperability between code implemented by Python syntax and Java code will be very natural.
+However, Jython does not support Python 3 anymore, and it is no longer actively maintained.
+
+[GraalVM](https://www.graalvm.org/python/): GraalVM takes use of Truffle framework to support interoperability between Python and Java.
+However, it has the limitation that not all the Python libraries are supported. As we know, many Python libraries rely on standard CPython to implement their C extensions.
+The other problem is that it only works with GraalVM, which means high migration costs.
+
+[JPype](https://github.com/jpype-project/jpype): Similar to [PEMJA](https://github.com/alibaba/pemja),
+JPype is also a framework built using JNI and Python/C API, but JPype only supports calling Java from Python.
+
+[Jep](https://github.com/ninia/jep): Similar to [PEMJA](https://github.com/alibaba/pemja), Jep is also a framework built using JNI and Python/C API and it supports calling Python from Java.
+However, it doesn’t provide a jar to the maven repository and the process of loading native packages needs to be specified in advance through jvm parameters or environment variables when the JVM starts,
+which makes it difficult to integrate. Furthermore, our benchmark shows that the performance is not very good.
+
+[PEMJA](https://github.com/alibaba/pemja): Similar to Jep and JPype, PEMJA is built on CPython, so it cannot support other Python interpreters, such as PyPy, etc.
+Since CPython is the most used implementation and standard of Python Runtime officially provided by Python, most libraries of the Python ecology are built on CPython Runtime and so could work with PEMJA naturally.
+
+## Thread Mode
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/2022-04-24-pyflink-1.15-thread-mode/pyflink-thread-mode.png"/>
+<br/>
+<i><small>Fig. 4 - PyFlink Runtime in Thread Mode</small></i>
+</center>
+<br/>
+
+From the picture above, we can see that in thread mode, the Python user-defined function runs in the same process as the Python operator(which runs in JVM).
+[PEMJA](https://github.com/alibaba/pemja) is used as a bridge between the Java code and the Python code. 
+
+Since the Python user-defined function runs in JVM, for each input data received from the upstream operators, it will be passed to
+the Python user-defined function directly instead of buffered and passed to the Python user-defined function in a batch.
+Therefore, thread mode could have lower latency compared to the process mode. Currently, if users want to achieve lower latency in process mode, usually they need to configure the
+`python.fn-execution.bundle.size` or `python.fn-execution.bundle.time` to a lower value. However, since it involves inter-process communication,
+the latency is still a little high in some scenarios. However, this is not a problem any more in thread mode. Besides, configuring `python.fn-execution.bundle.size` or `python.fn-execution.bundle.time` to
+a lower value usually will affect the overall performance of the job and this will also not be a problem in thread mode.
+
+
+## Comparisons between process mode and thread mode
+
+<table width="95%" border="1">
+  <thead>
+    <tr>
+      <th style="text-align: center">Execution Mode</th>
+      <th style="text-align: center">Benefits</th>
+      <th style="text-align: center">Limitations</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td style="text-align: center">Process Mode</td>
+      <td style="text-align: justify">
+        <ul>
+          <li>Better resource isolation</li>
+        </ul>
+      </td>
+      <td style="text-align: justify">
+        <ul>
+          <li>IPC overhead</li>
+          <li>High implementation complexity</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td style="text-align: center">Thread Mode</td>
+      <td style="text-align: justify">
+        <ul>
+          <li>Higher throughput</li>
+          <li>Lower latency</li>
+          <li>Less checkpoint time</li>
+          <li>Less usage restrictions</li>
+        </ul>
+      </td>
+      <td style="text-align: justify">
+        <ul>
+          <li>Only support for CPython</li>
+          <li>Multiple jobs cannot use different Python interpreters in session mode</li>
+          <li>Performance is affected by the GIL</li>
+        </ul>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+`Benefits of thread mode`:
+
+Since it processes data in batches in process mode, currently Python user-defined functions could not be used in some scenarios,
+e.g. used in the Join(Table API & SQL) condition and taking columns both from the left table and the right table as inputs.
+However, this will not be a big problem any more in thread mode because of the nature that it handles the data one by one instead of a batch.
+
+Unlike process mode which sends and receives data asynchronously in batches, in thread mode, data will be processed synchronously one by one.
+So usually it will have lower latency and also less checkpoint time. In terms of performance, since there is no inter-process communication,
+it could avoid data serialization/deserialization and communication overhead, as well as the stage of copying and context switching between kernel space and user space,
+so it usually will have better performance in thread mode.
+
+`Limitations`:
+
+However, there are also some limitations for thread mode:
+
+ - It only supports CPython which is also one of the most used Python interpreters.
+ - It doesn’t support session mode well and so it’s recommended that users only use thread mode in per-job or application deployments.
+The reason is it doesn’t support using different Python interpreters for the jobs running in the same TaskManager.
+This limitation comes from the fact that many Python libraries assume that they will only be initialized once in the process, so they use a lot of static variables.
+
+## Usage
+
+The execution mode could be configured via the configuration `python.execution-mode`. It has two possible values:
+
+ - `process`: The Python user-defined functions will be executed in a separate Python process. (default)
+ - `thread`: The Python user-defined functions will be executed in the same process as Java operators.
+
+```python
+# Specify `process` mode
+table_env.get_config().set("python.execution-mode", "process")
+
+# Specify `thread` mode
+table_env.get_config().set("python.execution-mode", "thread")
+```
+
+It should be noted that since this is still the first release of 'thread' mode, currently there are still many limitations about it,
+e.g. it only supports Python ScalarFunction of Python Table API & SQL. It will fall back to 'process' mode where 'thread' mode is not supported.
+So it may happen that you configure a job to execute in thread mode, however, it’s actually executed in 'process' execution mode.
+
+## [Benchmark](https://github.com/HuangXingBo/pyflink-benchmark)
+
+### Test environment
+
+OS: Alibaba Cloud Linux (Aliyun Linux) release 2.1903 LTS (Hunting Beagle)
+
+CPU: Intel(R) Xeon(R) Platinum 8269CY CPU @ 2.50GHz
+
+Memory: 16G
+
+CPython: Python 3.7.3
+
+JDK: OpenJDK Runtime Environment (build 1.8.0_292-b10)
+
+PyFlink: 1.15.0
+
+### Test results
+
+Here, we test the json processing which is a very common scenario for PyFlink users.
+
+The UDF implementation is as following:
+
+```python
+# python udf
+@udf(result_type=DataTypes.STRING(), func_type="general")
+def json_value_lower(s: str):
+    import json
+    a = json.loads(s)
+    a['a'] = a['a'].lower()
+    return json.dumps(a)
+
+```
+
+```java
+// Java UDF
+public class JsonValueLower extends ScalarFunction {
+    private transient ObjectMapper mapper;
+    private transient ObjectWriter writer;
+
+    @Override
+    public void open(FunctionContext context) throws Exception {
+        this.mapper = new ObjectMapper();
+        this.writer = mapper.writerWithDefaultPrettyPrinter();
+    }
+
+    public String eval(String s) {
+        try {
+            StringObject object = mapper.readValue(s, StringObject.class);
+            object.setA(object.a.toLowerCase());
+            return writer.writeValueAsString(object);
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException("Failed to read json value", e);
+        }
+    }
+
+    private static class StringObject {
+        private String a;
+
+        public String getA() {
+            return a;
+        }
+
+        public void setA(String a) {
+            this.a = a;
+        }
+
+        @Override
+        public String toString() {
+            return "StringObject{" + "a='" + a + '\'' + '}';
+        }
+    }
+}
+```
+
+The test results is as following:
+
+<table width="95%" border="1">
+  <thead>
+    <tr>
+      <th style="text-align: center">Type (input data size)</th>
+      <th style="text-align: center">QPS</th>
+      <th style="text-align: center">Latency</th>
+      <th style="text-align: center">Checkpoint Time</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td style="text-align: center">Java UDF (100k)</td>
+      <td style="text-align: center">900</td>
+      <td style="text-align: center">2ms</td>
+      <td style="text-align: center">100ms</td>
+    </tr>
+    <tr>
+      <td style="text-align: center">Java UDF (10k)</td>
+      <td style="text-align: center">1w</td>
+      <td style="text-align: center">20us</td>
+      <td style="text-align: center">10ms</td>
+    </tr>
+    <tr>
+      <td style="text-align: center">Java UDF (1k)</td>
+      <td style="text-align: center">5w</td>
+      <td style="text-align: center">1us</td>
+      <td style="text-align: center">10ms</td>
+    </tr>
+    <tr>
+      <td style="text-align: center">Java UDF (100)</td>
+      <td style="text-align: center">28w</td>
+      <td style="text-align: center">200ns</td>
+      <td style="text-align: center">10ms</td>
+    </tr>
+    <tr>
+      <td style="text-align: center">Process Mode (100k)</td>
+      <td style="text-align: center">900</td>
+      <td style="text-align: center">5s-10s</td>
+      <td style="text-align: center">5s</td>
+    </tr>
+    <tr>
+      <td style="text-align: center">Process Mode (10k)</td>
+      <td style="text-align: center">7000</td>
+      <td style="text-align: center">5s-10s</td>
+      <td style="text-align: center">3s</td>
+    </tr>
+    <tr>
+      <td style="text-align: center">Process Mode (1k)</td>
+      <td style="text-align: center">3.6w</td>
+      <td style="text-align: center">3s</td>
+      <td style="text-align: center">3s</td>
+    </tr>
+    <tr>
+      <td style="text-align: center">Process Mode (100)</td>
+      <td style="text-align: center">12w</td>
+      <td style="text-align: center">2s</td>
+      <td style="text-align: center">2s</td>
+    </tr>
+    <tr>
+      <td style="text-align: center">Thread Mode (100k)</td>
+      <td style="text-align: center">1200</td>
+      <td style="text-align: center">1ms</td>
+      <td style="text-align: center">100ms</td>
+    </tr>
+    <tr>
+      <td style="text-align: center">Thread Mode (10k)</td>
+      <td style="text-align: center">1.2w</td>
+      <td style="text-align: center">20us</td>
+      <td style="text-align: center">10ms</td>
+    </tr>
+    <tr>
+      <td style="text-align: center">Thread Mode (1k)</td>
+      <td style="text-align: center">5w</td>
+      <td style="text-align: center">3us</td>
+      <td style="text-align: center">10ms</td>
+    </tr>
+    <tr>
+      <td style="text-align: center">Thread Mode (100)</td>
+      <td style="text-align: center">12w</td>
+      <td style="text-align: center">1us</td>
+      <td style="text-align: center">10ms</td>
+    </tr>
+  </tbody>
+</table>
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/2022-04-24-pyflink-1.15-thread-mode/pyflink-performance.jpg"/>
+</center>
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/2022-04-24-pyflink-1.15-thread-mode/pyflink-latency.jpg"/>
+</center>
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/2022-04-24-pyflink-1.15-thread-mode/pyflink-checkpoint-time.jpg"/>
+</center>
+
+As we can see from the test results:
+
+ * If you care about latency and checkpoint time, thread mode is your better choice than process mode. The processing latency is decreased from several seconds in process mode to microseconds in thread mode. 

Review Comment:
   ```suggestion
    * If you care about latency and checkpoint time, thread mode is your better choice. The processing latency could be decreased from several seconds in process mode to microseconds in thread mode. 
   ```
   Nit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org