You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/27 06:37:41 UTC

[GitHub] [flink] shuiqiangchen opened a new pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.

shuiqiangchen opened a new pull request #13803:
URL: https://github.com/apache/flink/pull/13803


   ## What is the purpose of the change
   
   Add ProcessFunction and timer access for Python DataStream API.
   
   ## Brief change log
   
     - *Added ProcessFunction interface and DataStream.process() function.*
     - *Added PythonStatefunFunctionOperator to start python stateful function worker.*  
     - *Added DataStreamPythonStatefunFunctionOperation and coders.*
   
   
   ## Verifying this change
   
   This pull request will be verified by an e2e test in test_pyflink.sh
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: ( no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? ( no)
     - If yes, how is the feature documented? (not documented)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13803:
URL: https://github.com/apache/flink/pull/13803#issuecomment-717145782


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c88ea42fe4ce5260b492d592ae8f5d79f54ca829",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8378",
       "triggerID" : "c88ea42fe4ce5260b492d592ae8f5d79f54ca829",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8498",
       "triggerID" : "2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a214b00956b850ec3aca2455478353472dd4f44c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8551",
       "triggerID" : "a214b00956b850ec3aca2455478353472dd4f44c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5099e5b3b31a5978ec4bc6ad0b61f11c83e8206f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8585",
       "triggerID" : "5099e5b3b31a5978ec4bc6ad0b61f11c83e8206f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99cb17f3cfb36bde9ab0002e7b5d64c37d4db0f1",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8637",
       "triggerID" : "99cb17f3cfb36bde9ab0002e7b5d64c37d4db0f1",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 99cb17f3cfb36bde9ab0002e7b5d64c37d4db0f1 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8637) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] shuiqiangchen commented on a change in pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.

Posted by GitBox <gi...@apache.org>.
shuiqiangchen commented on a change in pull request #13803:
URL: https://github.com/apache/flink/pull/13803#discussion_r514014519



##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/StatefulPythonFunctionOperator.java
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamStatefulPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * {@link StatefulPythonFunctionOperator} is responsible for launching beam runner which will start
+ * a python harness to execute user defined python function. It is also able to handle the timer and
+ * state request from the python stateful user defined function.
+ * */
+public class StatefulPythonFunctionOperator<OUT> extends AbstractOneInputPythonFunctionOperator<Row, OUT>
+	implements ResultTypeQueryable<OUT>, Triggerable<Row, VoidNamespace> {
+
+	private static final Logger LOGGER = LoggerFactory.getLogger(StatefulPythonFunctionOperator.class);
+
+	protected static final String DATA_STREAM_STATEFUL_PYTHON_FUNCTION_URN =

Review comment:
       Yes, these fields could be private, it's previously referenced by runners and is a careless omission after the code refactor.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13803:
URL: https://github.com/apache/flink/pull/13803#issuecomment-717145782


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c88ea42fe4ce5260b492d592ae8f5d79f54ca829",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8378",
       "triggerID" : "c88ea42fe4ce5260b492d592ae8f5d79f54ca829",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8498",
       "triggerID" : "2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a214b00956b850ec3aca2455478353472dd4f44c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8551",
       "triggerID" : "a214b00956b850ec3aca2455478353472dd4f44c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5099e5b3b31a5978ec4bc6ad0b61f11c83e8206f",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8585",
       "triggerID" : "5099e5b3b31a5978ec4bc6ad0b61f11c83e8206f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99cb17f3cfb36bde9ab0002e7b5d64c37d4db0f1",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8637",
       "triggerID" : "99cb17f3cfb36bde9ab0002e7b5d64c37d4db0f1",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5099e5b3b31a5978ec4bc6ad0b61f11c83e8206f Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8585) 
   * 99cb17f3cfb36bde9ab0002e7b5d64c37d4db0f1 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8637) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] shuiqiangchen commented on a change in pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.

Posted by GitBox <gi...@apache.org>.
shuiqiangchen commented on a change in pull request #13803:
URL: https://github.com/apache/flink/pull/13803#discussion_r517740839



##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -540,3 +541,168 @@ def __init__(self, sink_func: Union[str, JavaObject]):
         :param sink_func: The java SinkFunction object or the full name of the SinkFunction class.
         """
         super(SinkFunction, self).__init__(sink_func)
+
+
+class ProcessFunction(Function):
+    """
+    A function that process elements of a stream.
+
+    For every element in the input stream process_element(value, ctx, out) is invoked. This can
+    produce zero or more elements as output. Implementations can also query the time and set timers
+    through the provided Context. For firing timers on_timer(long, ctx, out) will be invoked. This
+    can again produce zero or more elements as output and register further timers.
+
+    Note that access to keyed state and timers (which are also scoped to a key) is only available if
+    the ProcessFunction is applied on a KeyedStream.
+    """
+
+    @abc.abstractmethod
+    def process_element(self, value, ctx: 'Context', out: 'Collector'):
+        """
+        Process one element from the input stream.
+
+        This function can output zero or more elements using the Collector parameter and also update
+        internal state or set timers using the Context parameter.
+
+        :param value: The input value.
+        :param ctx:  A Context that allows querying the timestamp of the element and getting a
+                     TimerService for registering timers and querying the time. The context is only
+                     valid during the invocation of this method, do not store it.
+        :param out: The collector for returning result values.
+        """
+        pass
+
+    @abc.abstractmethod
+    def on_timer(self, timestamp, ctx: 'OnTimerContext', out: 'Collector'):
+        """
+        Called when a timer set using TimerService fires.
+
+        :param timestamp: The timestamp of the firing timer.
+        :param ctx: An OnTimerContext that allows querying the timestamp of the firing timer,
+                    querying the TimeDomain of the firing timer and getting a TimerService for
+                    registering timers and querying the time. The context is only valid during the
+                    invocation of this method, do not store it.
+        :param out: The collector for returning result values.
+        """
+        pass
+
+    class Context(abc.ABC):
+        """
+        Information available in an invocation of process_element(value, ctx, out) or
+        on_timer(value, ctx, out).
+        """
+
+        @abc.abstractmethod
+        def timer_service(self) -> 'TimerService':
+            """
+            A Timer service for querying time and registering timers.
+            """
+            pass
+
+    class OnTimerContext(abc.ABC):
+        """
+        Information available in an invocation of on_timer(long, OnTimerContext, Collector)
+        """
+
+        @abc.abstractmethod
+        def timer_service(self):
+            """
+            A Timer service for querying time and registering timers.
+            """
+            pass
+
+        @abc.abstractmethod
+        def time_domain(self) -> TimeCharacteristic:

Review comment:
       Yes, it should be TimeDomain, I will revise it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13803:
URL: https://github.com/apache/flink/pull/13803#issuecomment-717145782


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c88ea42fe4ce5260b492d592ae8f5d79f54ca829",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8378",
       "triggerID" : "c88ea42fe4ce5260b492d592ae8f5d79f54ca829",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8498",
       "triggerID" : "2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a214b00956b850ec3aca2455478353472dd4f44c",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8551",
       "triggerID" : "a214b00956b850ec3aca2455478353472dd4f44c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5099e5b3b31a5978ec4bc6ad0b61f11c83e8206f",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8585",
       "triggerID" : "5099e5b3b31a5978ec4bc6ad0b61f11c83e8206f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a214b00956b850ec3aca2455478353472dd4f44c Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8551) 
   * 5099e5b3b31a5978ec4bc6ad0b61f11c83e8206f Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8585) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13803:
URL: https://github.com/apache/flink/pull/13803#issuecomment-717145782


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c88ea42fe4ce5260b492d592ae8f5d79f54ca829",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8378",
       "triggerID" : "c88ea42fe4ce5260b492d592ae8f5d79f54ca829",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8498",
       "triggerID" : "2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a214b00956b850ec3aca2455478353472dd4f44c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8551",
       "triggerID" : "a214b00956b850ec3aca2455478353472dd4f44c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5099e5b3b31a5978ec4bc6ad0b61f11c83e8206f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8585",
       "triggerID" : "5099e5b3b31a5978ec4bc6ad0b61f11c83e8206f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99cb17f3cfb36bde9ab0002e7b5d64c37d4db0f1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8637",
       "triggerID" : "99cb17f3cfb36bde9ab0002e7b5d64c37d4db0f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a0303486c9e38e09f3f06f6b0a667ccda1852c1a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8775",
       "triggerID" : "a0303486c9e38e09f3f06f6b0a667ccda1852c1a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "53ce7c5e9bb066d8b92da2905dc51117af4c069b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8860",
       "triggerID" : "53ce7c5e9bb066d8b92da2905dc51117af4c069b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "98330b04f92594dd08c04d806e541205cffd1c06",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8900",
       "triggerID" : "98330b04f92594dd08c04d806e541205cffd1c06",
       "triggerType" : "PUSH"
     }, {
       "hash" : "48c792a198b30a543b84becd7f88debdad8403ca",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9037",
       "triggerID" : "48c792a198b30a543b84becd7f88debdad8403ca",
       "triggerType" : "PUSH"
     }, {
       "hash" : "050d0691bf6de8a2c12fdd3958a274c4db7244f0",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9046",
       "triggerID" : "050d0691bf6de8a2c12fdd3958a274c4db7244f0",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 48c792a198b30a543b84becd7f88debdad8403ca Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9037) 
   * 050d0691bf6de8a2c12fdd3958a274c4db7244f0 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9046) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] shuiqiangchen commented on a change in pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.

Posted by GitBox <gi...@apache.org>.
shuiqiangchen commented on a change in pull request #13803:
URL: https://github.com/apache/flink/pull/13803#discussion_r517815407



##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -19,6 +19,7 @@
 import abc
 from typing import Union, Any, Dict
 
+from apache_beam import TimeDomain

Review comment:
       Yes, it was an error import.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13803:
URL: https://github.com/apache/flink/pull/13803#issuecomment-717145782


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c88ea42fe4ce5260b492d592ae8f5d79f54ca829",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8378",
       "triggerID" : "c88ea42fe4ce5260b492d592ae8f5d79f54ca829",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8498",
       "triggerID" : "2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a214b00956b850ec3aca2455478353472dd4f44c",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8551",
       "triggerID" : "a214b00956b850ec3aca2455478353472dd4f44c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5099e5b3b31a5978ec4bc6ad0b61f11c83e8206f",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5099e5b3b31a5978ec4bc6ad0b61f11c83e8206f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a214b00956b850ec3aca2455478353472dd4f44c Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8551) 
   * 5099e5b3b31a5978ec4bc6ad0b61f11c83e8206f UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13803:
URL: https://github.com/apache/flink/pull/13803#issuecomment-717145782


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c88ea42fe4ce5260b492d592ae8f5d79f54ca829",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8378",
       "triggerID" : "c88ea42fe4ce5260b492d592ae8f5d79f54ca829",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8498",
       "triggerID" : "2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a214b00956b850ec3aca2455478353472dd4f44c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8551",
       "triggerID" : "a214b00956b850ec3aca2455478353472dd4f44c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5099e5b3b31a5978ec4bc6ad0b61f11c83e8206f",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8585",
       "triggerID" : "5099e5b3b31a5978ec4bc6ad0b61f11c83e8206f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5099e5b3b31a5978ec4bc6ad0b61f11c83e8206f Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8585) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13803:
URL: https://github.com/apache/flink/pull/13803#issuecomment-717145782


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c88ea42fe4ce5260b492d592ae8f5d79f54ca829",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8378",
       "triggerID" : "c88ea42fe4ce5260b492d592ae8f5d79f54ca829",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8498",
       "triggerID" : "2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a214b00956b850ec3aca2455478353472dd4f44c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a214b00956b850ec3aca2455478353472dd4f44c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8498) 
   * a214b00956b850ec3aca2455478353472dd4f44c UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13803:
URL: https://github.com/apache/flink/pull/13803#issuecomment-717145782


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c88ea42fe4ce5260b492d592ae8f5d79f54ca829",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8378",
       "triggerID" : "c88ea42fe4ce5260b492d592ae8f5d79f54ca829",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8498",
       "triggerID" : "2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a214b00956b850ec3aca2455478353472dd4f44c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8551",
       "triggerID" : "a214b00956b850ec3aca2455478353472dd4f44c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5099e5b3b31a5978ec4bc6ad0b61f11c83e8206f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8585",
       "triggerID" : "5099e5b3b31a5978ec4bc6ad0b61f11c83e8206f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99cb17f3cfb36bde9ab0002e7b5d64c37d4db0f1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8637",
       "triggerID" : "99cb17f3cfb36bde9ab0002e7b5d64c37d4db0f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a0303486c9e38e09f3f06f6b0a667ccda1852c1a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8775",
       "triggerID" : "a0303486c9e38e09f3f06f6b0a667ccda1852c1a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "53ce7c5e9bb066d8b92da2905dc51117af4c069b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8860",
       "triggerID" : "53ce7c5e9bb066d8b92da2905dc51117af4c069b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "98330b04f92594dd08c04d806e541205cffd1c06",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8900",
       "triggerID" : "98330b04f92594dd08c04d806e541205cffd1c06",
       "triggerType" : "PUSH"
     }, {
       "hash" : "48c792a198b30a543b84becd7f88debdad8403ca",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9037",
       "triggerID" : "48c792a198b30a543b84becd7f88debdad8403ca",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 98330b04f92594dd08c04d806e541205cffd1c06 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8900) 
   * 48c792a198b30a543b84becd7f88debdad8403ca Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9037) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dianfu closed pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.

Posted by GitBox <gi...@apache.org>.
dianfu closed pull request #13803:
URL: https://github.com/apache/flink/pull/13803


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dianfu commented on a change in pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #13803:
URL: https://github.com/apache/flink/pull/13803#discussion_r517809674



##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -19,6 +19,7 @@
 import abc
 from typing import Union, Any, Dict
 
+from apache_beam import TimeDomain

Review comment:
       I think we should import Flink's TimeDomain

##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -444,6 +444,30 @@ def broadcast(self) -> 'DataStream':
         """
         return DataStream(self._j_data_stream.broadcast())
 
+    def process(self, func: ProcessFunction, output_type: TypeInformation = None):

Review comment:
       ```suggestion
       def process(self, func: ProcessFunction, output_type: TypeInformation = None) -> 'DataStream':
   ```

##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -540,3 +541,161 @@ def __init__(self, sink_func: Union[str, JavaObject]):
         :param sink_func: The java SinkFunction object or the full name of the SinkFunction class.
         """
         super(SinkFunction, self).__init__(sink_func)
+
+
+class ProcessFunction(Function):

Review comment:
       Add ProcessFunction/Collector/TimerService to pyflink.datastream.__init__.py?

##########
File path: flink-python/pyflink/datastream/time_domain.py
##########
@@ -0,0 +1,42 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from enum import Enum
+
+from pyflink.java_gateway import get_gateway
+
+
+class TimeDomain(Enum):
+    """
+    TimeDomain specifies whether a firing timer is based on event time or processing time.
+
+    EVENT_TIME: Time is based on timestamp of events.
+    PROCESSING_TIME: Time is based on the current processing-time of a machine where processing
+                     happens.
+    """
+
+    EVENT_TIME = 0
+    PROCESSING_TIME = 1
+
+    @staticmethod

Review comment:
       Remove the following two methods as I think they are not necessary.

##########
File path: flink-python/pyflink/datastream/time_domain.py
##########
@@ -0,0 +1,42 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from enum import Enum
+
+from pyflink.java_gateway import get_gateway
+
+
+class TimeDomain(Enum):

Review comment:
       Add TimeDomain to pyflink.datastream.__init__.py?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13803:
URL: https://github.com/apache/flink/pull/13803#issuecomment-717145782


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c88ea42fe4ce5260b492d592ae8f5d79f54ca829",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8378",
       "triggerID" : "c88ea42fe4ce5260b492d592ae8f5d79f54ca829",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c88ea42fe4ce5260b492d592ae8f5d79f54ca829 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8378) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13803:
URL: https://github.com/apache/flink/pull/13803#issuecomment-717145782


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c88ea42fe4ce5260b492d592ae8f5d79f54ca829",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8378",
       "triggerID" : "c88ea42fe4ce5260b492d592ae8f5d79f54ca829",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c88ea42fe4ce5260b492d592ae8f5d79f54ca829 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8378) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13803:
URL: https://github.com/apache/flink/pull/13803#issuecomment-717145782






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13803:
URL: https://github.com/apache/flink/pull/13803#issuecomment-717145782


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c88ea42fe4ce5260b492d592ae8f5d79f54ca829",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8378",
       "triggerID" : "c88ea42fe4ce5260b492d592ae8f5d79f54ca829",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8498",
       "triggerID" : "2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a214b00956b850ec3aca2455478353472dd4f44c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8551",
       "triggerID" : "a214b00956b850ec3aca2455478353472dd4f44c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5099e5b3b31a5978ec4bc6ad0b61f11c83e8206f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8585",
       "triggerID" : "5099e5b3b31a5978ec4bc6ad0b61f11c83e8206f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99cb17f3cfb36bde9ab0002e7b5d64c37d4db0f1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8637",
       "triggerID" : "99cb17f3cfb36bde9ab0002e7b5d64c37d4db0f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a0303486c9e38e09f3f06f6b0a667ccda1852c1a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8775",
       "triggerID" : "a0303486c9e38e09f3f06f6b0a667ccda1852c1a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "53ce7c5e9bb066d8b92da2905dc51117af4c069b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8860",
       "triggerID" : "53ce7c5e9bb066d8b92da2905dc51117af4c069b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "98330b04f92594dd08c04d806e541205cffd1c06",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8900",
       "triggerID" : "98330b04f92594dd08c04d806e541205cffd1c06",
       "triggerType" : "PUSH"
     }, {
       "hash" : "48c792a198b30a543b84becd7f88debdad8403ca",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "48c792a198b30a543b84becd7f88debdad8403ca",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 98330b04f92594dd08c04d806e541205cffd1c06 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8900) 
   * 48c792a198b30a543b84becd7f88debdad8403ca UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] shuiqiangchen commented on a change in pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.

Posted by GitBox <gi...@apache.org>.
shuiqiangchen commented on a change in pull request #13803:
URL: https://github.com/apache/flink/pull/13803#discussion_r514034737



##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -540,3 +540,155 @@ def __init__(self, sink_func: Union[str, JavaObject]):
         :param sink_func: The java SinkFunction object or the full name of the SinkFunction class.
         """
         super(SinkFunction, self).__init__(sink_func)
+
+
+class ProcessFunction(Function):
+    """
+    A function that process elements of a stream.
+
+    For every element in the input stream process_element(value, ctx, out) is invoked. This can
+    produce zero or more elements as output. Implementations can also query the time and set timers
+    through the provided Context. For firing timers on_timer(long, ctx, out) will be invoked. This
+    can again produce zero or more elements as output and register further timers.
+
+    Note that access to keyed state and timers (which are also scoped to a key) is only available if
+    the ProcessFunction is applied on a KeyedStream. Process Function is always a RichFunction.
+    Therefore, access to the RuntimeContext is always available and setup and teardown methods can
+    be implemented. See Function.open() and Function.close().
+    """
+
+    @abc.abstractmethod
+    def process_element(self, value, ctx: 'Context', out: 'Collector'):
+        """
+        Process one element from the input stream.
+
+        This function can output zero or more elements using the Collector parameter and also update
+        internal state or set timers using the Context parameter.
+
+        :param value: The input value.
+        :param ctx:  A Context that allows querying the timestamp of the element and getting a
+                     TimerService for registering timers and querying the time. The context is only
+                     valid during the invocation of this method, do not store it.
+        :param out: The collector for returning result values.
+        """
+        pass
+
+    @abc.abstractmethod
+    def on_timer(self, timestamp, ctx: 'OnTimerContext', out: 'Collector'):
+        """
+        Called when a timer set using TimerService fires.
+
+        :param timestamp: The timestamp of the firing timer.
+        :param ctx: An OnTimerContext that allows querying the timestamp of the firing timer,
+                    querying the TimeDomain of the firing timer and getting a TimerService for
+                    registering timers and querying the time. The context is only valid during the
+                    invocation of this method, do not store it.
+        :param out: The collector for returning result values.
+        """
+        pass
+
+    class Context(abc.ABC):
+        """
+        Information available in an invocation of process_element(value, ctx, out) or
+        on_timer(value, ctx, out).
+        """
+
+        @abc.abstractmethod
+        def timestamp(self):
+            """
+            Timestamp of the element currently being processed or timestamp of a firing timer.
+
+            This might be null, for example if the time characteristic of your program is set to
+            TimeCharacteristic.ProcessingTime.
+            """
+            pass
+
+        @abc.abstractmethod
+        def timer_service(self):
+            """
+            A Timer service for querying time and registering timers.
+            """
+            pass
+
+    class OnTimerContext(abc.ABC):
+        """
+        Information available in an invocation of on_timer(long, OnTimerContext, Collector)
+        """
+        @abc.abstractmethod
+        def time_domain(self):

Review comment:
       Maybe we can remove this interface currently




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13803:
URL: https://github.com/apache/flink/pull/13803#issuecomment-717145782


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c88ea42fe4ce5260b492d592ae8f5d79f54ca829",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8378",
       "triggerID" : "c88ea42fe4ce5260b492d592ae8f5d79f54ca829",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8498",
       "triggerID" : "2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a214b00956b850ec3aca2455478353472dd4f44c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8551",
       "triggerID" : "a214b00956b850ec3aca2455478353472dd4f44c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5099e5b3b31a5978ec4bc6ad0b61f11c83e8206f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8585",
       "triggerID" : "5099e5b3b31a5978ec4bc6ad0b61f11c83e8206f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99cb17f3cfb36bde9ab0002e7b5d64c37d4db0f1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8637",
       "triggerID" : "99cb17f3cfb36bde9ab0002e7b5d64c37d4db0f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a0303486c9e38e09f3f06f6b0a667ccda1852c1a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8775",
       "triggerID" : "a0303486c9e38e09f3f06f6b0a667ccda1852c1a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "53ce7c5e9bb066d8b92da2905dc51117af4c069b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8860",
       "triggerID" : "53ce7c5e9bb066d8b92da2905dc51117af4c069b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "98330b04f92594dd08c04d806e541205cffd1c06",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8900",
       "triggerID" : "98330b04f92594dd08c04d806e541205cffd1c06",
       "triggerType" : "PUSH"
     }, {
       "hash" : "48c792a198b30a543b84becd7f88debdad8403ca",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9037",
       "triggerID" : "48c792a198b30a543b84becd7f88debdad8403ca",
       "triggerType" : "PUSH"
     }, {
       "hash" : "050d0691bf6de8a2c12fdd3958a274c4db7244f0",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "050d0691bf6de8a2c12fdd3958a274c4db7244f0",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 98330b04f92594dd08c04d806e541205cffd1c06 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8900) 
   * 48c792a198b30a543b84becd7f88debdad8403ca Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9037) 
   * 050d0691bf6de8a2c12fdd3958a274c4db7244f0 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13803:
URL: https://github.com/apache/flink/pull/13803#issuecomment-717145782


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c88ea42fe4ce5260b492d592ae8f5d79f54ca829",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8378",
       "triggerID" : "c88ea42fe4ce5260b492d592ae8f5d79f54ca829",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8498",
       "triggerID" : "2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a214b00956b850ec3aca2455478353472dd4f44c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8551",
       "triggerID" : "a214b00956b850ec3aca2455478353472dd4f44c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5099e5b3b31a5978ec4bc6ad0b61f11c83e8206f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8585",
       "triggerID" : "5099e5b3b31a5978ec4bc6ad0b61f11c83e8206f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99cb17f3cfb36bde9ab0002e7b5d64c37d4db0f1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8637",
       "triggerID" : "99cb17f3cfb36bde9ab0002e7b5d64c37d4db0f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a0303486c9e38e09f3f06f6b0a667ccda1852c1a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8775",
       "triggerID" : "a0303486c9e38e09f3f06f6b0a667ccda1852c1a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "53ce7c5e9bb066d8b92da2905dc51117af4c069b",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8860",
       "triggerID" : "53ce7c5e9bb066d8b92da2905dc51117af4c069b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "98330b04f92594dd08c04d806e541205cffd1c06",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "98330b04f92594dd08c04d806e541205cffd1c06",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 53ce7c5e9bb066d8b92da2905dc51117af4c069b Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8860) 
   * 98330b04f92594dd08c04d806e541205cffd1c06 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] shuiqiangchen commented on a change in pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.

Posted by GitBox <gi...@apache.org>.
shuiqiangchen commented on a change in pull request #13803:
URL: https://github.com/apache/flink/pull/13803#discussion_r517734385



##########
File path: flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
##########
@@ -16,43 +16,73 @@
 # limitations under the License.
 ################################################################################
 
-from pyflink.common.serialization import JsonRowSerializationSchema, \
-    JsonRowDeserializationSchema
+from pyflink.common.serialization import SimpleStringSchema
 from pyflink.common.typeinfo import Types
-from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors import FlinkKafkaProducer, FlinkKafkaConsumer
+from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
+from pyflink.datastream.connectors import FlinkKafkaProducer
+from pyflink.datastream.functions import ProcessFunction, Collector
+from pyflink.table import StreamTableEnvironment
 
-from functions import m_flat_map, add_one
+from functions import MyKeySelector
 
 
 def python_data_stream_example():
     env = StreamExecutionEnvironment.get_execution_environment()
+    # Set the parallelism to be one to make sure that all data including fired timer and normal data
+    # are processed by the same worker and the collected result would be in order which is good for
+    # assertion.
+    env.set_parallelism(1)
+    env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
+    t_env = StreamTableEnvironment.create(stream_execution_environment=env)
 
-    source_type_info = Types.ROW([Types.STRING(), Types.INT()])

Review comment:
       Currently, users are not able to defined a TimestampAssigner and Watermark generator for DataStream, so, we choose to create a DataStream with row time and watermark strategy specified via a SQL DDL. It would be supported for users to defined WaterMarkStrategy in the later PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] shuiqiangchen commented on a change in pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.

Posted by GitBox <gi...@apache.org>.
shuiqiangchen commented on a change in pull request #13803:
URL: https://github.com/apache/flink/pull/13803#discussion_r513925523



##########
File path: flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_timer_job.py
##########
@@ -0,0 +1,89 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from pyflink.common.serialization import SimpleStringSchema
+from pyflink.common.typeinfo import Types
+from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
+from pyflink.datastream.connectors import FlinkKafkaProducer
+from pyflink.datastream.functions import ProcessFunction, Collector
+from pyflink.table import StreamTableEnvironment
+
+
+def test_ds_timer():
+    env = StreamExecutionEnvironment.get_execution_environment()
+    env.set_parallelism(1)

Review comment:
       Set the parallelism to be one to make sure that all data including fired timer and normal data are processed by the same worker and the collected result would be in order which is good for assertion.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13803:
URL: https://github.com/apache/flink/pull/13803#issuecomment-717145782


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c88ea42fe4ce5260b492d592ae8f5d79f54ca829",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8378",
       "triggerID" : "c88ea42fe4ce5260b492d592ae8f5d79f54ca829",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8498",
       "triggerID" : "2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a214b00956b850ec3aca2455478353472dd4f44c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8551",
       "triggerID" : "a214b00956b850ec3aca2455478353472dd4f44c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5099e5b3b31a5978ec4bc6ad0b61f11c83e8206f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8585",
       "triggerID" : "5099e5b3b31a5978ec4bc6ad0b61f11c83e8206f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99cb17f3cfb36bde9ab0002e7b5d64c37d4db0f1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8637",
       "triggerID" : "99cb17f3cfb36bde9ab0002e7b5d64c37d4db0f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a0303486c9e38e09f3f06f6b0a667ccda1852c1a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8775",
       "triggerID" : "a0303486c9e38e09f3f06f6b0a667ccda1852c1a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "53ce7c5e9bb066d8b92da2905dc51117af4c069b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8860",
       "triggerID" : "53ce7c5e9bb066d8b92da2905dc51117af4c069b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "98330b04f92594dd08c04d806e541205cffd1c06",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8900",
       "triggerID" : "98330b04f92594dd08c04d806e541205cffd1c06",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 98330b04f92594dd08c04d806e541205cffd1c06 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8900) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dianfu commented on a change in pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #13803:
URL: https://github.com/apache/flink/pull/13803#discussion_r517291472



##########
File path: flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
##########
@@ -16,43 +16,73 @@
 # limitations under the License.
 ################################################################################
 
-from pyflink.common.serialization import JsonRowSerializationSchema, \
-    JsonRowDeserializationSchema
+from pyflink.common.serialization import SimpleStringSchema
 from pyflink.common.typeinfo import Types
-from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors import FlinkKafkaProducer, FlinkKafkaConsumer
+from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
+from pyflink.datastream.connectors import FlinkKafkaProducer
+from pyflink.datastream.functions import ProcessFunction, Collector
+from pyflink.table import StreamTableEnvironment
 
-from functions import m_flat_map, add_one
+from functions import MyKeySelector
 
 
 def python_data_stream_example():
     env = StreamExecutionEnvironment.get_execution_environment()
+    # Set the parallelism to be one to make sure that all data including fired timer and normal data
+    # are processed by the same worker and the collected result would be in order which is good for
+    # assertion.
+    env.set_parallelism(1)
+    env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
+    t_env = StreamTableEnvironment.create(stream_execution_environment=env)
 
-    source_type_info = Types.ROW([Types.STRING(), Types.INT()])

Review comment:
       What's the purpose of this change? Why we need to create a table?

##########
File path: flink-python/pyflink/fn_execution/operation_utils.py
##########
@@ -29,6 +31,7 @@
 TABLE_FUNCTION_URN = "flink:transform:table_function:v1"
 STREAM_GROUP_AGGREGATE_URN = "flink:transform:stream_group_aggregate:v1"
 DATA_STREAM_STATELESS_FUNCTION_URN = "flink:transform:datastream_stateless_function:v1"
+DATA_STREAM_STATEFUL_FUNCTION_URN = "flink:transform:datastream_stateful_function:v1"

Review comment:
       ```suggestion
   DATA_STREAM_STATEFUL_FUNCTION_URN = "flink:transform:process_function:v1"
   ```

##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -540,3 +541,168 @@ def __init__(self, sink_func: Union[str, JavaObject]):
         :param sink_func: The java SinkFunction object or the full name of the SinkFunction class.
         """
         super(SinkFunction, self).__init__(sink_func)
+
+
+class ProcessFunction(Function):
+    """
+    A function that process elements of a stream.
+
+    For every element in the input stream process_element(value, ctx, out) is invoked. This can
+    produce zero or more elements as output. Implementations can also query the time and set timers
+    through the provided Context. For firing timers on_timer(long, ctx, out) will be invoked. This
+    can again produce zero or more elements as output and register further timers.
+
+    Note that access to keyed state and timers (which are also scoped to a key) is only available if
+    the ProcessFunction is applied on a KeyedStream.
+    """
+
+    @abc.abstractmethod
+    def process_element(self, value, ctx: 'Context', out: 'Collector'):
+        """
+        Process one element from the input stream.
+
+        This function can output zero or more elements using the Collector parameter and also update
+        internal state or set timers using the Context parameter.
+
+        :param value: The input value.
+        :param ctx:  A Context that allows querying the timestamp of the element and getting a
+                     TimerService for registering timers and querying the time. The context is only
+                     valid during the invocation of this method, do not store it.
+        :param out: The collector for returning result values.
+        """
+        pass
+
+    @abc.abstractmethod
+    def on_timer(self, timestamp, ctx: 'OnTimerContext', out: 'Collector'):
+        """
+        Called when a timer set using TimerService fires.
+
+        :param timestamp: The timestamp of the firing timer.
+        :param ctx: An OnTimerContext that allows querying the timestamp of the firing timer,
+                    querying the TimeDomain of the firing timer and getting a TimerService for
+                    registering timers and querying the time. The context is only valid during the
+                    invocation of this method, do not store it.
+        :param out: The collector for returning result values.
+        """
+        pass
+
+    class Context(abc.ABC):
+        """
+        Information available in an invocation of process_element(value, ctx, out) or
+        on_timer(value, ctx, out).
+        """
+
+        @abc.abstractmethod
+        def timer_service(self) -> 'TimerService':
+            """
+            A Timer service for querying time and registering timers.
+            """
+            pass
+
+    class OnTimerContext(abc.ABC):
+        """
+        Information available in an invocation of on_timer(long, OnTimerContext, Collector)
+        """
+
+        @abc.abstractmethod
+        def timer_service(self):

Review comment:
       OnTimerContext could extend Context, then we can remove this method.

##########
File path: flink-python/pyflink/fn_execution/operations.py
##########
@@ -345,3 +347,70 @@ def process_element_or_timer(self, input_data: Tuple[int, Row, int, Row]):
     def close(self):
         if self.group_agg_function is not None:
             self.group_agg_function.close()
+
+
+class DataStreamStatefulFunctionOperation(StatefulFunctionOperation):

Review comment:
       ```suggestion
   class ProcessFunctionOperation(StatefulFunctionOperation):
   ```

##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -540,3 +541,168 @@ def __init__(self, sink_func: Union[str, JavaObject]):
         :param sink_func: The java SinkFunction object or the full name of the SinkFunction class.
         """
         super(SinkFunction, self).__init__(sink_func)
+
+
+class ProcessFunction(Function):
+    """
+    A function that process elements of a stream.
+
+    For every element in the input stream process_element(value, ctx, out) is invoked. This can
+    produce zero or more elements as output. Implementations can also query the time and set timers
+    through the provided Context. For firing timers on_timer(long, ctx, out) will be invoked. This
+    can again produce zero or more elements as output and register further timers.
+
+    Note that access to keyed state and timers (which are also scoped to a key) is only available if
+    the ProcessFunction is applied on a KeyedStream.
+    """
+
+    @abc.abstractmethod
+    def process_element(self, value, ctx: 'Context', out: 'Collector'):
+        """
+        Process one element from the input stream.
+
+        This function can output zero or more elements using the Collector parameter and also update
+        internal state or set timers using the Context parameter.
+
+        :param value: The input value.
+        :param ctx:  A Context that allows querying the timestamp of the element and getting a
+                     TimerService for registering timers and querying the time. The context is only
+                     valid during the invocation of this method, do not store it.
+        :param out: The collector for returning result values.
+        """
+        pass
+
+    @abc.abstractmethod
+    def on_timer(self, timestamp, ctx: 'OnTimerContext', out: 'Collector'):
+        """
+        Called when a timer set using TimerService fires.
+
+        :param timestamp: The timestamp of the firing timer.
+        :param ctx: An OnTimerContext that allows querying the timestamp of the firing timer,
+                    querying the TimeDomain of the firing timer and getting a TimerService for
+                    registering timers and querying the time. The context is only valid during the
+                    invocation of this method, do not store it.
+        :param out: The collector for returning result values.
+        """
+        pass
+
+    class Context(abc.ABC):
+        """
+        Information available in an invocation of process_element(value, ctx, out) or
+        on_timer(value, ctx, out).
+        """
+
+        @abc.abstractmethod
+        def timer_service(self) -> 'TimerService':
+            """
+            A Timer service for querying time and registering timers.
+            """
+            pass
+
+    class OnTimerContext(abc.ABC):
+        """
+        Information available in an invocation of on_timer(long, OnTimerContext, Collector)
+        """
+
+        @abc.abstractmethod
+        def timer_service(self):
+            """
+            A Timer service for querying time and registering timers.
+            """
+            pass
+
+        @abc.abstractmethod
+        def time_domain(self) -> TimeCharacteristic:

Review comment:
       Should return type of TimeDomain

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonProcessFunctionOperator.java
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.types.Row;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * {@link PythonProcessFunctionOperator} is responsible for launching beam runner which will start
+ * a python harness to execute user defined python function. It is also able to handle the timer and
+ * state request from the python stateful user defined function.
+ * */
+@Internal
+public class PythonProcessFunctionOperator<OUT> extends AbstractOneInputPythonFunctionOperator<Row, OUT>
+	implements ResultTypeQueryable<OUT>, Triggerable<Row, VoidNamespace> {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final String DATA_STREAM_STATEFUL_PYTHON_FUNCTION_URN =
+		"flink:transform:datastream_stateful_function:v1";
+
+	private static final String DATA_STREAM_STATEFUL_PROCESS_FUNCTION_CODER_URN =
+		"flink:coder:datastream:flatmap_function:v1";

Review comment:
       ```suggestion
   		"flink:coder:flat_map:v1";
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13803:
URL: https://github.com/apache/flink/pull/13803#issuecomment-717145782


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c88ea42fe4ce5260b492d592ae8f5d79f54ca829",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c88ea42fe4ce5260b492d592ae8f5d79f54ca829",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c88ea42fe4ce5260b492d592ae8f5d79f54ca829 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dianfu commented on a change in pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #13803:
URL: https://github.com/apache/flink/pull/13803#discussion_r514719949



##########
File path: flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_timer_job.py
##########
@@ -0,0 +1,91 @@
+################################################################################

Review comment:
       I suggest still use the name "data_stream_job", timer is only one aspect, in the future, we could also reuse this test case to cover the state tests.

##########
File path: flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_timer_job.py
##########
@@ -0,0 +1,91 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from pyflink.common.serialization import SimpleStringSchema
+from pyflink.common.typeinfo import Types
+from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
+from pyflink.datastream.connectors import FlinkKafkaProducer
+from pyflink.datastream.functions import ProcessFunction, Collector
+from pyflink.table import StreamTableEnvironment
+
+from datastream.functions import MyKeySelector
+
+
+def test_ds_timer():

Review comment:
       ```suggestion
   def python_data_stream_example():
   ```

##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -540,3 +540,152 @@ def __init__(self, sink_func: Union[str, JavaObject]):
         :param sink_func: The java SinkFunction object or the full name of the SinkFunction class.
         """
         super(SinkFunction, self).__init__(sink_func)
+
+
+class ProcessFunction(Function):
+    """
+    A function that process elements of a stream.
+
+    For every element in the input stream process_element(value, ctx, out) is invoked. This can
+    produce zero or more elements as output. Implementations can also query the time and set timers
+    through the provided Context. For firing timers on_timer(long, ctx, out) will be invoked. This
+    can again produce zero or more elements as output and register further timers.
+
+    Note that access to keyed state and timers (which are also scoped to a key) is only available if
+    the ProcessFunction is applied on a KeyedStream.
+    """
+
+    @abc.abstractmethod
+    def process_element(self, value, ctx: 'Context', out: 'Collector'):
+        """
+        Process one element from the input stream.
+
+        This function can output zero or more elements using the Collector parameter and also update
+        internal state or set timers using the Context parameter.
+
+        :param value: The input value.
+        :param ctx:  A Context that allows querying the timestamp of the element and getting a
+                     TimerService for registering timers and querying the time. The context is only
+                     valid during the invocation of this method, do not store it.
+        :param out: The collector for returning result values.
+        """
+        pass
+
+    @abc.abstractmethod
+    def on_timer(self, timestamp, ctx: 'OnTimerContext', out: 'Collector'):
+        """
+        Called when a timer set using TimerService fires.
+
+        :param timestamp: The timestamp of the firing timer.
+        :param ctx: An OnTimerContext that allows querying the timestamp of the firing timer,
+                    querying the TimeDomain of the firing timer and getting a TimerService for
+                    registering timers and querying the time. The context is only valid during the
+                    invocation of this method, do not store it.
+        :param out: The collector for returning result values.
+        """
+        pass
+
+    class Context(abc.ABC):
+        """
+        Information available in an invocation of process_element(value, ctx, out) or
+        on_timer(value, ctx, out).
+        """
+
+        @abc.abstractmethod
+        def timer_service(self) -> 'TimerService':
+            """
+            A Timer service for querying time and registering timers.
+            """
+            pass
+
+    class OnTimerContext(abc.ABC):
+        """
+        Information available in an invocation of on_timer(long, OnTimerContext, Collector)
+        """
+        @abc.abstractmethod
+        def timer_service(self):

Review comment:
       Isn't this method should it time_domain?

##########
File path: flink-python/pyflink/fn_execution/beam/beam_coders.py
##########
@@ -354,3 +355,29 @@ def _create_impl(self):
         HIGHEST_PROTOCOL = pickle.HIGHEST_PROTOCOL
         return coder_impl.CallbackCoderImpl(
             lambda x: dumps(filter_data_views(x), HIGHEST_PROTOCOL), pickle.loads)
+
+
+class BeamDataStreamStatefulMapCoder(FastCoder):

Review comment:
       Is it possible to reuse BeamDataStreamStatelessFlatMapCoder? Maybe need to refactor the name a bit.

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonProcessFunctionOperator.java
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.types.Row;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * {@link PythonProcessFunctionOperator} is responsible for launching beam runner which will start
+ * a python harness to execute user defined python function. It is also able to handle the timer and
+ * state request from the python stateful user defined function.
+ * */
+@Internal
+public class PythonProcessFunctionOperator<OUT> extends AbstractOneInputPythonFunctionOperator<Row, OUT>
+	implements ResultTypeQueryable<OUT>, Triggerable<Row, VoidNamespace> {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final String DATA_STREAM_STATEFUL_PYTHON_FUNCTION_URN =
+		"flink:transform:datastream_stateful_function:v1";

Review comment:
       ```suggestion
   		"flink:transform:process_function:v1";
   ```

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonProcessFunctionOperator.java
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.types.Row;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * {@link PythonProcessFunctionOperator} is responsible for launching beam runner which will start
+ * a python harness to execute user defined python function. It is also able to handle the timer and
+ * state request from the python stateful user defined function.
+ * */
+@Internal
+public class PythonProcessFunctionOperator<OUT> extends AbstractOneInputPythonFunctionOperator<Row, OUT>
+	implements ResultTypeQueryable<OUT>, Triggerable<Row, VoidNamespace> {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final String DATA_STREAM_STATEFUL_PYTHON_FUNCTION_URN =
+		"flink:transform:datastream_stateful_function:v1";
+
+	private static final String DATA_STREAM_STATEFUL_PROCESS_FUNCTION_CODER_URN =
+		"flink:coder:datastream:stateful_process_function:v1";

Review comment:
       ```suggestion
   		"flink:coder:datastream:process_function:v1";
   ```

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonProcessFunctionOperator.java
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.types.Row;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * {@link PythonProcessFunctionOperator} is responsible for launching beam runner which will start
+ * a python harness to execute user defined python function. It is also able to handle the timer and
+ * state request from the python stateful user defined function.
+ * */
+@Internal
+public class PythonProcessFunctionOperator<OUT> extends AbstractOneInputPythonFunctionOperator<Row, OUT>
+	implements ResultTypeQueryable<OUT>, Triggerable<Row, VoidNamespace> {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final String DATA_STREAM_STATEFUL_PYTHON_FUNCTION_URN =
+		"flink:transform:datastream_stateful_function:v1";
+
+	private static final String DATA_STREAM_STATEFUL_PROCESS_FUNCTION_CODER_URN =
+		"flink:coder:datastream:stateful_process_function:v1";
+
+	/**
+	 * The python {@link org.apache.flink.streaming.api.functions.ProcessFunction} to be executed.
+	 */
+	private final DataStreamPythonFunctionInfo pythonFunctionInfo;
+
+	/**
+	 * The TypeInformation of python worker input data.
+	 */
+	private final TypeInformation runnerInputTypeInfo;
+
+	/**
+	 * The TypeInformation of python worker output data.
+	 */
+	private final TypeInformation runnerOutputTypeInfo;
+
+	/**
+	 * The TypeInformation of output data or this operator.
+	 */
+	private final TypeInformation<OUT> outputTypeInfo;
+
+	/**
+	 * The TypeInformation of current key.
+	 */
+	private final TypeInformation<Row> keyTypeInfo;
+
+	/**
+	 * Serializer to serialize input data for python worker.
+	 */
+	private transient TypeSerializer runnerInputSerializer;
+
+	/**
+	 * Serializer to deserialize output data from python worker.
+	 */
+	private transient TypeSerializer runnerOutputSerializer;
+
+	/**
+	 * Serializer for current key.
+	 */
+	private transient TypeSerializer keyTypeSerializer;
+
+	/**
+	 * TimerService for current operator to register or fire timer.
+	 */
+	private transient TimerService timerservice;
+
+	/**
+	 * The options used to configure the Python worker process.
+	 */
+	protected final Map<String, String> jobOptions;
+
+	/**
+	 * Reusable InputStream used to holding the execution results to be deserialized.
+	 */
+	protected transient ByteArrayInputStreamWithPos bais;
+
+	/**
+	 * InputStream Wrapper.
+	 */
+	protected transient DataInputViewStreamWrapper baisWrapper;
+
+	/**
+	 * Reusable OutputStream used to holding the serialized input elements.
+	 */
+	protected transient ByteArrayOutputStreamWithPos baos;
+
+	/**
+	 * OutputStream Wrapper.
+	 */
+	protected transient DataOutputViewStreamWrapper baosWrapper;
+
+	/**
+	 * The collector for collecting output data to be emitted.
+	 */
+	private transient StreamRecordCollector streamRecordCollector;
+
+	/**
+	 * Reusable row for normal data runner inputs.
+	 */
+	private transient Row reusableInput;
+
+	/**
+	 * Reusable row for timer data runner inputs.
+	 */
+	private transient Row reusableTimerData;
+
+	public PythonProcessFunctionOperator(
+		Configuration config,
+		RowTypeInfo inputTypeInfo,
+		TypeInformation<OUT> outputTypeInfo,
+		DataStreamPythonFunctionInfo pythonFunctionInfo) {
+		super(config);
+		this.jobOptions = config.toMap();
+		this.pythonFunctionInfo = pythonFunctionInfo;
+		this.outputTypeInfo = outputTypeInfo;
+		this.keyTypeInfo = new RowTypeInfo(inputTypeInfo.getTypeAt(0));
+		// inputType: normal data/ timer data, timerType: proc/event time, currentWatermark, keyData, real data
+		this.runnerInputTypeInfo = Types.ROW(Types.INT, Types.LONG, Types.LONG, this.keyTypeInfo, inputTypeInfo);
+		this.runnerOutputTypeInfo = Types.ROW(Types.INT, Types.LONG, this.keyTypeInfo, outputTypeInfo);
+	}
+
+	@Override
+	public void open() throws Exception {
+		super.open();
+		bais = new ByteArrayInputStreamWithPos();
+		baisWrapper = new DataInputViewStreamWrapper(bais);
+
+		baos = new ByteArrayOutputStreamWithPos();
+		baosWrapper = new DataOutputViewStreamWrapper(baos);
+		keyTypeSerializer = PythonTypeUtils.TypeInfoToSerializerConverter.typeInfoSerializerConverter(keyTypeInfo);
+		runnerInputSerializer = PythonTypeUtils.TypeInfoToSerializerConverter
+			.typeInfoSerializerConverter(runnerInputTypeInfo);
+		runnerOutputSerializer = PythonTypeUtils.TypeInfoToSerializerConverter
+			.typeInfoSerializerConverter(runnerOutputTypeInfo);
+
+		InternalTimerService<VoidNamespace> internalTimerService = getInternalTimerService("user-timers",
+			VoidNamespaceSerializer.INSTANCE, this);
+
+		this.streamRecordCollector = new StreamRecordCollector(output);

Review comment:
       ```suggestion
   		streamRecordCollector = new StreamRecordCollector(output);
   ```
   Nit: Just want to keep the code style consistent.

##########
File path: flink-end-to-end-tests/test-scripts/test_pyflink.sh
##########
@@ -217,58 +217,73 @@ setup_kafka_dist
 
 start_kafka_cluster
 
-create_data_stream_kafka_source
+# End to end test for DataStream ProcessFunction with timer
+create_kafka_topic 1 1 timer-stream-source
+create_kafka_topic 1 1 timer-stream-sink
 
-create_kafka_topic 1 1 test-python-data-stream-sink
+PAYMENT_MSGS='{"createTime": "2020-10-26 10:30:13", "orderId": 1603679414, "payAmount": 83685.44904332698, "payPlatform": 0, "provinceId": 3}
+{"createTime": "2020-10-26 10:30:26", "orderId": 1603679427, "payAmount": 30092.50657757042, "payPlatform": 0, "provinceId": 1}
+{"createTime": "2020-10-26 10:30:27", "orderId": 1603679428, "payAmount": 62644.01719293056, "payPlatform": 0, "provinceId": 6}
+{"createTime": "2020-10-26 10:30:28", "orderId": 1603679429, "payAmount": 6449.806795118451, "payPlatform": 0, "provinceId": 2}
+{"createTime": "2020-10-26 10:31:31", "orderId": 1603679492, "payAmount": 41108.36128417494, "payPlatform": 0, "provinceId": 0}
+{"createTime": "2020-10-26 10:31:32", "orderId": 1603679493, "payAmount": 64882.44233197067, "payPlatform": 0, "provinceId": 4}
+{"createTime": "2020-10-26 10:32:01", "orderId": 1603679522, "payAmount": 81648.80712644062, "payPlatform": 0, "provinceId": 3}
+{"createTime": "2020-10-26 10:32:02", "orderId": 1603679523, "payAmount": 81861.73063103345, "payPlatform": 0, "provinceId": 4}'
 
-PYFLINK_CLIENT_EXECUTABLE=${PYTHON_EXEC}
+function send_msg_to_kafka {
+
+    while read line
+    do
+	 	send_messages_to_kafka "$line" "timer-stream-source"
+        sleep 3
+    done <<< "$1"
+}
 
 JOB_ID=$(${FLINK_DIR}/bin/flink run \
     -p 2 \
     -pyfs "${FLINK_PYTHON_TEST_DIR}/python/datastream" \
     -pyreq "${REQUIREMENTS_PATH}" \
     -pyarch "${TEST_DATA_DIR}/venv.zip" \
     -pyexec "venv.zip/.conda/bin/python" \
-    -pym "data_stream_job" \
+    -pym "data_stream_timer_job" \
     -j "${KAFKA_SQL_JAR}")
 
 echo "${JOB_ID}"
 JOB_ID=`echo "${JOB_ID}" | sed 's/.* //g'`
 
+wait_job_running ${JOB_ID}
+
+# wait 10s to ensure all tasks are up.

Review comment:
       Isn't wait_job_running already making sure that all the subtasks are up?

##########
File path: flink-end-to-end-tests/test-scripts/test_pyflink.sh
##########
@@ -217,58 +217,73 @@ setup_kafka_dist
 
 start_kafka_cluster
 
-create_data_stream_kafka_source
+# End to end test for DataStream ProcessFunction with timer
+create_kafka_topic 1 1 timer-stream-source
+create_kafka_topic 1 1 timer-stream-sink
 
-create_kafka_topic 1 1 test-python-data-stream-sink
+PAYMENT_MSGS='{"createTime": "2020-10-26 10:30:13", "orderId": 1603679414, "payAmount": 83685.44904332698, "payPlatform": 0, "provinceId": 3}
+{"createTime": "2020-10-26 10:30:26", "orderId": 1603679427, "payAmount": 30092.50657757042, "payPlatform": 0, "provinceId": 1}
+{"createTime": "2020-10-26 10:30:27", "orderId": 1603679428, "payAmount": 62644.01719293056, "payPlatform": 0, "provinceId": 6}
+{"createTime": "2020-10-26 10:30:28", "orderId": 1603679429, "payAmount": 6449.806795118451, "payPlatform": 0, "provinceId": 2}
+{"createTime": "2020-10-26 10:31:31", "orderId": 1603679492, "payAmount": 41108.36128417494, "payPlatform": 0, "provinceId": 0}
+{"createTime": "2020-10-26 10:31:32", "orderId": 1603679493, "payAmount": 64882.44233197067, "payPlatform": 0, "provinceId": 4}
+{"createTime": "2020-10-26 10:32:01", "orderId": 1603679522, "payAmount": 81648.80712644062, "payPlatform": 0, "provinceId": 3}
+{"createTime": "2020-10-26 10:32:02", "orderId": 1603679523, "payAmount": 81861.73063103345, "payPlatform": 0, "provinceId": 4}'
 
-PYFLINK_CLIENT_EXECUTABLE=${PYTHON_EXEC}
+function send_msg_to_kafka {
+
+    while read line
+    do
+	 	send_messages_to_kafka "$line" "timer-stream-source"

Review comment:
       correct the indentation

##########
File path: flink-end-to-end-tests/test-scripts/test_pyflink.sh
##########
@@ -217,58 +217,73 @@ setup_kafka_dist
 
 start_kafka_cluster
 
-create_data_stream_kafka_source
+# End to end test for DataStream ProcessFunction with timer
+create_kafka_topic 1 1 timer-stream-source
+create_kafka_topic 1 1 timer-stream-sink
 
-create_kafka_topic 1 1 test-python-data-stream-sink
+PAYMENT_MSGS='{"createTime": "2020-10-26 10:30:13", "orderId": 1603679414, "payAmount": 83685.44904332698, "payPlatform": 0, "provinceId": 3}
+{"createTime": "2020-10-26 10:30:26", "orderId": 1603679427, "payAmount": 30092.50657757042, "payPlatform": 0, "provinceId": 1}
+{"createTime": "2020-10-26 10:30:27", "orderId": 1603679428, "payAmount": 62644.01719293056, "payPlatform": 0, "provinceId": 6}
+{"createTime": "2020-10-26 10:30:28", "orderId": 1603679429, "payAmount": 6449.806795118451, "payPlatform": 0, "provinceId": 2}
+{"createTime": "2020-10-26 10:31:31", "orderId": 1603679492, "payAmount": 41108.36128417494, "payPlatform": 0, "provinceId": 0}
+{"createTime": "2020-10-26 10:31:32", "orderId": 1603679493, "payAmount": 64882.44233197067, "payPlatform": 0, "provinceId": 4}
+{"createTime": "2020-10-26 10:32:01", "orderId": 1603679522, "payAmount": 81648.80712644062, "payPlatform": 0, "provinceId": 3}
+{"createTime": "2020-10-26 10:32:02", "orderId": 1603679523, "payAmount": 81861.73063103345, "payPlatform": 0, "provinceId": 4}'
 
-PYFLINK_CLIENT_EXECUTABLE=${PYTHON_EXEC}
+function send_msg_to_kafka {
+
+    while read line
+    do
+	 	send_messages_to_kafka "$line" "timer-stream-source"
+        sleep 3
+    done <<< "$1"
+}
 
 JOB_ID=$(${FLINK_DIR}/bin/flink run \
     -p 2 \

Review comment:
       The parallelism has been set to 1 and so this doesn't take effect.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13803:
URL: https://github.com/apache/flink/pull/13803#issuecomment-717145782


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c88ea42fe4ce5260b492d592ae8f5d79f54ca829",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8378",
       "triggerID" : "c88ea42fe4ce5260b492d592ae8f5d79f54ca829",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8498",
       "triggerID" : "2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a214b00956b850ec3aca2455478353472dd4f44c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8551",
       "triggerID" : "a214b00956b850ec3aca2455478353472dd4f44c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5099e5b3b31a5978ec4bc6ad0b61f11c83e8206f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8585",
       "triggerID" : "5099e5b3b31a5978ec4bc6ad0b61f11c83e8206f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99cb17f3cfb36bde9ab0002e7b5d64c37d4db0f1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8637",
       "triggerID" : "99cb17f3cfb36bde9ab0002e7b5d64c37d4db0f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a0303486c9e38e09f3f06f6b0a667ccda1852c1a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8775",
       "triggerID" : "a0303486c9e38e09f3f06f6b0a667ccda1852c1a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "53ce7c5e9bb066d8b92da2905dc51117af4c069b",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8860",
       "triggerID" : "53ce7c5e9bb066d8b92da2905dc51117af4c069b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "98330b04f92594dd08c04d806e541205cffd1c06",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8900",
       "triggerID" : "98330b04f92594dd08c04d806e541205cffd1c06",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 53ce7c5e9bb066d8b92da2905dc51117af4c069b Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8860) 
   * 98330b04f92594dd08c04d806e541205cffd1c06 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8900) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13803:
URL: https://github.com/apache/flink/pull/13803#issuecomment-717145782


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c88ea42fe4ce5260b492d592ae8f5d79f54ca829",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8378",
       "triggerID" : "c88ea42fe4ce5260b492d592ae8f5d79f54ca829",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8498",
       "triggerID" : "2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c88ea42fe4ce5260b492d592ae8f5d79f54ca829 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8378) 
   * 2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8498) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13803:
URL: https://github.com/apache/flink/pull/13803#issuecomment-717145782


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c88ea42fe4ce5260b492d592ae8f5d79f54ca829",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8378",
       "triggerID" : "c88ea42fe4ce5260b492d592ae8f5d79f54ca829",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8498",
       "triggerID" : "2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a214b00956b850ec3aca2455478353472dd4f44c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8551",
       "triggerID" : "a214b00956b850ec3aca2455478353472dd4f44c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5099e5b3b31a5978ec4bc6ad0b61f11c83e8206f",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8585",
       "triggerID" : "5099e5b3b31a5978ec4bc6ad0b61f11c83e8206f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99cb17f3cfb36bde9ab0002e7b5d64c37d4db0f1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "99cb17f3cfb36bde9ab0002e7b5d64c37d4db0f1",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5099e5b3b31a5978ec4bc6ad0b61f11c83e8206f Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8585) 
   * 99cb17f3cfb36bde9ab0002e7b5d64c37d4db0f1 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13803:
URL: https://github.com/apache/flink/pull/13803#issuecomment-717145782


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c88ea42fe4ce5260b492d592ae8f5d79f54ca829",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8378",
       "triggerID" : "c88ea42fe4ce5260b492d592ae8f5d79f54ca829",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8498",
       "triggerID" : "2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8498) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13803:
URL: https://github.com/apache/flink/pull/13803#issuecomment-717145782


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c88ea42fe4ce5260b492d592ae8f5d79f54ca829",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8378",
       "triggerID" : "c88ea42fe4ce5260b492d592ae8f5d79f54ca829",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c88ea42fe4ce5260b492d592ae8f5d79f54ca829 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8378) 
   * 2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] shuiqiangchen commented on a change in pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.

Posted by GitBox <gi...@apache.org>.
shuiqiangchen commented on a change in pull request #13803:
URL: https://github.com/apache/flink/pull/13803#discussion_r514007289



##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/StatefulPythonFunctionOperator.java
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamStatefulPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * {@link StatefulPythonFunctionOperator} is responsible for launching beam runner which will start
+ * a python harness to execute user defined python function. It is also able to handle the timer and
+ * state request from the python stateful user defined function.
+ * */
+public class StatefulPythonFunctionOperator<OUT> extends AbstractOneInputPythonFunctionOperator<Row, OUT>

Review comment:
       Currently it is for ProcessFunction, it's ok to rename it to be `PythonProcessFunctionOperator `.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13803:
URL: https://github.com/apache/flink/pull/13803#issuecomment-717145782


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c88ea42fe4ce5260b492d592ae8f5d79f54ca829",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8378",
       "triggerID" : "c88ea42fe4ce5260b492d592ae8f5d79f54ca829",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8498",
       "triggerID" : "2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a214b00956b850ec3aca2455478353472dd4f44c",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8551",
       "triggerID" : "a214b00956b850ec3aca2455478353472dd4f44c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8498) 
   * a214b00956b850ec3aca2455478353472dd4f44c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8551) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dianfu commented on a change in pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #13803:
URL: https://github.com/apache/flink/pull/13803#discussion_r513882412



##########
File path: flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_timer_job.py
##########
@@ -0,0 +1,89 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from pyflink.common.serialization import SimpleStringSchema
+from pyflink.common.typeinfo import Types
+from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
+from pyflink.datastream.connectors import FlinkKafkaProducer
+from pyflink.datastream.functions import ProcessFunction, Collector
+from pyflink.table import StreamTableEnvironment
+
+
+def test_ds_timer():
+    env = StreamExecutionEnvironment.get_execution_environment()
+    env.set_parallelism(1)
+    env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
+
+    t_env = StreamTableEnvironment.create(stream_execution_environment=env)
+    t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed",

Review comment:
       this is not necessary any more

##########
File path: flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_timer_job.py
##########
@@ -0,0 +1,89 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from pyflink.common.serialization import SimpleStringSchema
+from pyflink.common.typeinfo import Types
+from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
+from pyflink.datastream.connectors import FlinkKafkaProducer
+from pyflink.datastream.functions import ProcessFunction, Collector
+from pyflink.table import StreamTableEnvironment
+
+
+def test_ds_timer():
+    env = StreamExecutionEnvironment.get_execution_environment()
+    env.set_parallelism(1)

Review comment:
       Why set the parallelism to 1?

##########
File path: flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_timer_job.py
##########
@@ -0,0 +1,89 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from pyflink.common.serialization import SimpleStringSchema
+from pyflink.common.typeinfo import Types
+from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
+from pyflink.datastream.connectors import FlinkKafkaProducer
+from pyflink.datastream.functions import ProcessFunction, Collector
+from pyflink.table import StreamTableEnvironment
+
+
+def test_ds_timer():

Review comment:
       Could we merge this test case into the existing data stream test instead of adding another one test case?

##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -540,3 +540,155 @@ def __init__(self, sink_func: Union[str, JavaObject]):
         :param sink_func: The java SinkFunction object or the full name of the SinkFunction class.
         """
         super(SinkFunction, self).__init__(sink_func)
+
+
+class ProcessFunction(Function):
+    """
+    A function that process elements of a stream.
+
+    For every element in the input stream process_element(value, ctx, out) is invoked. This can
+    produce zero or more elements as output. Implementations can also query the time and set timers
+    through the provided Context. For firing timers on_timer(long, ctx, out) will be invoked. This
+    can again produce zero or more elements as output and register further timers.
+
+    Note that access to keyed state and timers (which are also scoped to a key) is only available if
+    the ProcessFunction is applied on a KeyedStream. Process Function is always a RichFunction.

Review comment:
       ```suggestion
       the ProcessFunction is applied on a KeyedStream.
   ```
   Unnecessary comments as all the Python functions are rich, not only ProcessFunction.

##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -540,3 +540,155 @@ def __init__(self, sink_func: Union[str, JavaObject]):
         :param sink_func: The java SinkFunction object or the full name of the SinkFunction class.
         """
         super(SinkFunction, self).__init__(sink_func)
+
+
+class ProcessFunction(Function):
+    """
+    A function that process elements of a stream.
+
+    For every element in the input stream process_element(value, ctx, out) is invoked. This can
+    produce zero or more elements as output. Implementations can also query the time and set timers
+    through the provided Context. For firing timers on_timer(long, ctx, out) will be invoked. This
+    can again produce zero or more elements as output and register further timers.
+
+    Note that access to keyed state and timers (which are also scoped to a key) is only available if
+    the ProcessFunction is applied on a KeyedStream. Process Function is always a RichFunction.
+    Therefore, access to the RuntimeContext is always available and setup and teardown methods can
+    be implemented. See Function.open() and Function.close().

Review comment:
       ```suggestion
       
   ```

##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -540,3 +540,155 @@ def __init__(self, sink_func: Union[str, JavaObject]):
         :param sink_func: The java SinkFunction object or the full name of the SinkFunction class.
         """
         super(SinkFunction, self).__init__(sink_func)
+
+
+class ProcessFunction(Function):
+    """
+    A function that process elements of a stream.
+
+    For every element in the input stream process_element(value, ctx, out) is invoked. This can
+    produce zero or more elements as output. Implementations can also query the time and set timers
+    through the provided Context. For firing timers on_timer(long, ctx, out) will be invoked. This
+    can again produce zero or more elements as output and register further timers.
+
+    Note that access to keyed state and timers (which are also scoped to a key) is only available if
+    the ProcessFunction is applied on a KeyedStream. Process Function is always a RichFunction.
+    Therefore, access to the RuntimeContext is always available and setup and teardown methods can
+    be implemented. See Function.open() and Function.close().
+    """
+
+    @abc.abstractmethod
+    def process_element(self, value, ctx: 'Context', out: 'Collector'):
+        """
+        Process one element from the input stream.
+
+        This function can output zero or more elements using the Collector parameter and also update
+        internal state or set timers using the Context parameter.
+
+        :param value: The input value.
+        :param ctx:  A Context that allows querying the timestamp of the element and getting a
+                     TimerService for registering timers and querying the time. The context is only
+                     valid during the invocation of this method, do not store it.
+        :param out: The collector for returning result values.
+        """
+        pass
+
+    @abc.abstractmethod
+    def on_timer(self, timestamp, ctx: 'OnTimerContext', out: 'Collector'):
+        """
+        Called when a timer set using TimerService fires.
+
+        :param timestamp: The timestamp of the firing timer.
+        :param ctx: An OnTimerContext that allows querying the timestamp of the firing timer,
+                    querying the TimeDomain of the firing timer and getting a TimerService for
+                    registering timers and querying the time. The context is only valid during the
+                    invocation of this method, do not store it.
+        :param out: The collector for returning result values.
+        """
+        pass
+
+    class Context(abc.ABC):
+        """
+        Information available in an invocation of process_element(value, ctx, out) or
+        on_timer(value, ctx, out).
+        """
+
+        @abc.abstractmethod
+        def timestamp(self):
+            """
+            Timestamp of the element currently being processed or timestamp of a firing timer.
+
+            This might be null, for example if the time characteristic of your program is set to
+            TimeCharacteristic.ProcessingTime.
+            """
+            pass
+
+        @abc.abstractmethod
+        def timer_service(self):
+            """
+            A Timer service for querying time and registering timers.
+            """
+            pass
+
+    class OnTimerContext(abc.ABC):
+        """
+        Information available in an invocation of on_timer(long, OnTimerContext, Collector)
+        """
+        @abc.abstractmethod
+        def time_domain(self):

Review comment:
       ditto

##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -540,3 +540,155 @@ def __init__(self, sink_func: Union[str, JavaObject]):
         :param sink_func: The java SinkFunction object or the full name of the SinkFunction class.
         """
         super(SinkFunction, self).__init__(sink_func)
+
+
+class ProcessFunction(Function):
+    """
+    A function that process elements of a stream.
+
+    For every element in the input stream process_element(value, ctx, out) is invoked. This can
+    produce zero or more elements as output. Implementations can also query the time and set timers
+    through the provided Context. For firing timers on_timer(long, ctx, out) will be invoked. This
+    can again produce zero or more elements as output and register further timers.
+
+    Note that access to keyed state and timers (which are also scoped to a key) is only available if
+    the ProcessFunction is applied on a KeyedStream. Process Function is always a RichFunction.
+    Therefore, access to the RuntimeContext is always available and setup and teardown methods can
+    be implemented. See Function.open() and Function.close().
+    """
+
+    @abc.abstractmethod
+    def process_element(self, value, ctx: 'Context', out: 'Collector'):
+        """
+        Process one element from the input stream.
+
+        This function can output zero or more elements using the Collector parameter and also update
+        internal state or set timers using the Context parameter.
+
+        :param value: The input value.
+        :param ctx:  A Context that allows querying the timestamp of the element and getting a
+                     TimerService for registering timers and querying the time. The context is only
+                     valid during the invocation of this method, do not store it.
+        :param out: The collector for returning result values.
+        """
+        pass
+
+    @abc.abstractmethod
+    def on_timer(self, timestamp, ctx: 'OnTimerContext', out: 'Collector'):
+        """
+        Called when a timer set using TimerService fires.
+
+        :param timestamp: The timestamp of the firing timer.
+        :param ctx: An OnTimerContext that allows querying the timestamp of the firing timer,
+                    querying the TimeDomain of the firing timer and getting a TimerService for
+                    registering timers and querying the time. The context is only valid during the
+                    invocation of this method, do not store it.
+        :param out: The collector for returning result values.
+        """
+        pass
+
+    class Context(abc.ABC):
+        """
+        Information available in an invocation of process_element(value, ctx, out) or
+        on_timer(value, ctx, out).
+        """
+
+        @abc.abstractmethod
+        def timestamp(self):
+            """
+            Timestamp of the element currently being processed or timestamp of a firing timer.
+
+            This might be null, for example if the time characteristic of your program is set to
+            TimeCharacteristic.ProcessingTime.
+            """
+            pass
+
+        @abc.abstractmethod
+        def timer_service(self):
+            """
+            A Timer service for querying time and registering timers.
+            """
+            pass
+
+    class OnTimerContext(abc.ABC):
+        """
+        Information available in an invocation of on_timer(long, OnTimerContext, Collector)
+        """
+        @abc.abstractmethod
+        def time_domain(self):
+            pass
+
+
+class Collector(abc.ABC):
+    """
+    Collects a record and forwards it.
+    """
+    @abc.abstractmethod
+    def collect(self, value):
+        """
+        Emits a record.
+
+        :param value: The record to collect.
+        """
+        pass
+
+
+class TimerService(abc.ABC):
+    """
+    Interface for working with time and timers.
+    """
+
+    @abc.abstractmethod
+    def current_processing_time(self):
+        """
+        Returns the current processing time.
+        """
+        pass
+
+    @abc.abstractmethod
+    def current_watermark(self):
+        """
+        Returns the current event-time watermark.
+        """
+        pass
+
+    @abc.abstractmethod
+    def register_processing_time_timer(self, time: int):
+        """
+        Registers a timer to be fired when processing time passes the given time.
+
+        Timers can internally be scoped to keys and/or windows. When you set a timer in a keyed
+        context, such as in an operation on KeyedStream then that context will so be active when you
+        receive the timer notification.
+
+        :param time: The processing time of the timer to be registered.
+        """
+        pass
+
+    @abc.abstractmethod
+    def register_event_time_timer(self, time: int):
+        """
+        Registers a timer tobe fired when the event time watermark passes the given time.
+
+        Timers can internally be scoped to keys and/or windows. When you set a timer in a keyed
+        context, such as in an operation on KeyedStream then that context will so be active when you
+        receive the timer notification.
+
+        :param time: The event time of the timer to be registered.
+        """
+        pass
+
+
+class InternalProcessFunctionContext(ProcessFunction.Context):
+    """
+    Internal implementation of ProcessFunction.Context.
+    """
+
+    def __init__(self, timer_service: 'TimerService'):
+        self._timer_service = timer_service
+
+    def timestamp(self):
+        pass

Review comment:
       This API is still not available? If so, we can remove it for now.

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/StatefulPythonFunctionOperator.java
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamStatefulPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * {@link StatefulPythonFunctionOperator} is responsible for launching beam runner which will start
+ * a python harness to execute user defined python function. It is also able to handle the timer and
+ * state request from the python stateful user defined function.
+ * */
+public class StatefulPythonFunctionOperator<OUT> extends AbstractOneInputPythonFunctionOperator<Row, OUT>

Review comment:
       Add @Internal

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/StatefulPythonFunctionOperator.java
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamStatefulPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * {@link StatefulPythonFunctionOperator} is responsible for launching beam runner which will start
+ * a python harness to execute user defined python function. It is also able to handle the timer and
+ * state request from the python stateful user defined function.
+ * */
+public class StatefulPythonFunctionOperator<OUT> extends AbstractOneInputPythonFunctionOperator<Row, OUT>
+	implements ResultTypeQueryable<OUT>, Triggerable<Row, VoidNamespace> {
+
+	private static final Logger LOGGER = LoggerFactory.getLogger(StatefulPythonFunctionOperator.class);
+
+	protected static final String DATA_STREAM_STATEFUL_PYTHON_FUNCTION_URN =
+		"flink:transform:datastream_stateful_function:v1";
+
+	protected static final String DATA_STREAM_STATEFUL_MAP_FUNCTION_CODER_URN =
+		"flink:coder:datastream:stateful_map_function:v1";
+
+	protected final DataStreamPythonFunctionInfo pythonFunctionInfo;

Review comment:
       Add Java doc for the fields and methods. Currently there is no Java doc at all in this class.

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/StatefulPythonFunctionOperator.java
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamStatefulPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * {@link StatefulPythonFunctionOperator} is responsible for launching beam runner which will start
+ * a python harness to execute user defined python function. It is also able to handle the timer and
+ * state request from the python stateful user defined function.
+ * */
+public class StatefulPythonFunctionOperator<OUT> extends AbstractOneInputPythonFunctionOperator<Row, OUT>
+	implements ResultTypeQueryable<OUT>, Triggerable<Row, VoidNamespace> {
+
+	private static final Logger LOGGER = LoggerFactory.getLogger(StatefulPythonFunctionOperator.class);
+
+	protected static final String DATA_STREAM_STATEFUL_PYTHON_FUNCTION_URN =
+		"flink:transform:datastream_stateful_function:v1";
+
+	protected static final String DATA_STREAM_STATEFUL_MAP_FUNCTION_CODER_URN =
+		"flink:coder:datastream:stateful_map_function:v1";
+
+	protected final DataStreamPythonFunctionInfo pythonFunctionInfo;
+	private final TypeInformation runnerInputTypeInfo;
+	private final TypeInformation runnerOutputTypeInfo;
+	private final TypeInformation<OUT> outputTypeInfo;
+	private final TypeInformation<Row> keyTypeInfo;
+	private TypeSerializer runnerInputSerializer;
+	private TypeSerializer runnerOutputSerializer;
+	private TypeSerializer keyTypeSerializer;
+
+	private transient TimerService timerservice;
+
+	protected final Map<String, String> jobOptions;
+
+	protected transient ByteArrayInputStreamWithPos bais;
+
+	protected transient DataInputViewStreamWrapper baisWrapper;
+
+	protected transient ByteArrayOutputStreamWithPos baos;
+
+	protected transient DataOutputViewStreamWrapper baosWrapper;
+
+	protected transient StreamRecordCollector streamRecordCollector;
+
+	protected Row reusableInput;

Review comment:
       ```suggestion
   	private transient  Row reusableInput;
   ```

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonOperatorUtils.java
##########
@@ -132,4 +133,22 @@
 			dataStreamPythonFunctionInfo.getPythonFunction().getSerializedPythonFunction()));
 		return builder.build();
 	}
+
+	public static FlinkFnApi.UserDefinedDataStreamFunction
+	getUserDefinedDataStreamStatefulFunctionProto(
+		DataStreamPythonFunctionInfo dataStreamPythonFunctionInfo,
+		RuntimeContext runtimeContext,
+		Map<String, String> internalParameters,
+		TypeInformation keyTypeInfo) {
+		FlinkFnApi.UserDefinedDataStreamFunction userDefinedDataStreamFunction =
+			getUserDefinedDataStreamFunctionProto(dataStreamPythonFunctionInfo, runtimeContext,
+				internalParameters);
+		FlinkFnApi.TypeInfo.FieldType builtKeyFieldType = PythonTypeUtils.TypeInfoToProtoConverter
+			.getFieldType(keyTypeInfo);
+		return userDefinedDataStreamFunction.toBuilder()
+			.setKeyTypeInfo(PythonTypeUtils.TypeInfoToProtoConverter
+				.toTypeInfoProto(builtKeyFieldType)).build();
+

Review comment:
       unnecessary empty line

##########
File path: flink-python/pyflink/fn_execution/stateful_operation_common.py
##########
@@ -0,0 +1,72 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import time
+
+from typing import Any
+
+from pyflink.datastream.functions import Collector, TimerService
+
+
+class InternalCollector(Collector):
+    """
+    Internal implementation of the Collector. It usa a buffer list to store data to be emitted.
+    There will be a header flag for each data type. 0 means it is a proc time timer registering
+    request, while 1 means it is an event time timer and 2 means it is a normal data. When
+    registering a timer, it must take along the corresponding key for it.
+    """
+    def __init__(self):
+        self.buf = []
+
+    def collect_proc_timer(self, a: Any, key: Any):
+        self.buf.append((0, a, key, None))
+
+    def collect_event_timer(self, a: Any, key: Any):
+        self.buf.append((1, a, key, None))
+
+    def collect_data(self, a: Any):
+        self.buf.append((2, a))
+
+    def collect(self, a: Any):
+        self.collect_data(a)
+
+    def clear(self):
+        self.buf.clear()
+
+
+class InternalTimerService(TimerService):
+    """
+    Internal implementation of TimerService.
+    """
+    def __init__(self, collector, keyed_state_backend):
+        self._collector: InternalCollector = collector
+        self._keyed_state_backend = keyed_state_backend
+        self._current_watermark = None
+
+    def current_processing_time(self) -> int:
+        return int(time.time() * 1000)
+
+    def register_processing_time_timer(self, t: int):
+        current_key = self._keyed_state_backend.get_current_key()
+        self._collector.collect_proc_timer(t, current_key)
+
+    def register_event_time_timer(self, t: int):
+        current_key = self._keyed_state_backend.get_current_key()
+        self._collector.collect_event_timer(t, current_key)
+
+    def current_watermark(self) -> int:
+        return self._current_watermark

Review comment:
       watermark is always None?

##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -540,3 +540,155 @@ def __init__(self, sink_func: Union[str, JavaObject]):
         :param sink_func: The java SinkFunction object or the full name of the SinkFunction class.
         """
         super(SinkFunction, self).__init__(sink_func)
+
+
+class ProcessFunction(Function):
+    """
+    A function that process elements of a stream.
+
+    For every element in the input stream process_element(value, ctx, out) is invoked. This can
+    produce zero or more elements as output. Implementations can also query the time and set timers
+    through the provided Context. For firing timers on_timer(long, ctx, out) will be invoked. This
+    can again produce zero or more elements as output and register further timers.
+
+    Note that access to keyed state and timers (which are also scoped to a key) is only available if
+    the ProcessFunction is applied on a KeyedStream. Process Function is always a RichFunction.
+    Therefore, access to the RuntimeContext is always available and setup and teardown methods can
+    be implemented. See Function.open() and Function.close().
+    """
+
+    @abc.abstractmethod
+    def process_element(self, value, ctx: 'Context', out: 'Collector'):
+        """
+        Process one element from the input stream.
+
+        This function can output zero or more elements using the Collector parameter and also update
+        internal state or set timers using the Context parameter.
+
+        :param value: The input value.
+        :param ctx:  A Context that allows querying the timestamp of the element and getting a
+                     TimerService for registering timers and querying the time. The context is only
+                     valid during the invocation of this method, do not store it.
+        :param out: The collector for returning result values.
+        """
+        pass
+
+    @abc.abstractmethod
+    def on_timer(self, timestamp, ctx: 'OnTimerContext', out: 'Collector'):
+        """
+        Called when a timer set using TimerService fires.
+
+        :param timestamp: The timestamp of the firing timer.
+        :param ctx: An OnTimerContext that allows querying the timestamp of the firing timer,
+                    querying the TimeDomain of the firing timer and getting a TimerService for
+                    registering timers and querying the time. The context is only valid during the
+                    invocation of this method, do not store it.
+        :param out: The collector for returning result values.
+        """
+        pass
+
+    class Context(abc.ABC):
+        """
+        Information available in an invocation of process_element(value, ctx, out) or
+        on_timer(value, ctx, out).
+        """
+
+        @abc.abstractmethod
+        def timestamp(self):

Review comment:
       add type hint for the result type?

##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -540,3 +540,155 @@ def __init__(self, sink_func: Union[str, JavaObject]):
         :param sink_func: The java SinkFunction object or the full name of the SinkFunction class.
         """
         super(SinkFunction, self).__init__(sink_func)
+
+
+class ProcessFunction(Function):
+    """
+    A function that process elements of a stream.
+
+    For every element in the input stream process_element(value, ctx, out) is invoked. This can
+    produce zero or more elements as output. Implementations can also query the time and set timers
+    through the provided Context. For firing timers on_timer(long, ctx, out) will be invoked. This
+    can again produce zero or more elements as output and register further timers.
+
+    Note that access to keyed state and timers (which are also scoped to a key) is only available if
+    the ProcessFunction is applied on a KeyedStream. Process Function is always a RichFunction.
+    Therefore, access to the RuntimeContext is always available and setup and teardown methods can
+    be implemented. See Function.open() and Function.close().
+    """
+
+    @abc.abstractmethod
+    def process_element(self, value, ctx: 'Context', out: 'Collector'):
+        """
+        Process one element from the input stream.
+
+        This function can output zero or more elements using the Collector parameter and also update
+        internal state or set timers using the Context parameter.
+
+        :param value: The input value.
+        :param ctx:  A Context that allows querying the timestamp of the element and getting a
+                     TimerService for registering timers and querying the time. The context is only
+                     valid during the invocation of this method, do not store it.
+        :param out: The collector for returning result values.
+        """
+        pass
+
+    @abc.abstractmethod
+    def on_timer(self, timestamp, ctx: 'OnTimerContext', out: 'Collector'):
+        """
+        Called when a timer set using TimerService fires.
+
+        :param timestamp: The timestamp of the firing timer.
+        :param ctx: An OnTimerContext that allows querying the timestamp of the firing timer,
+                    querying the TimeDomain of the firing timer and getting a TimerService for
+                    registering timers and querying the time. The context is only valid during the
+                    invocation of this method, do not store it.
+        :param out: The collector for returning result values.
+        """
+        pass
+
+    class Context(abc.ABC):
+        """
+        Information available in an invocation of process_element(value, ctx, out) or
+        on_timer(value, ctx, out).
+        """
+
+        @abc.abstractmethod
+        def timestamp(self):
+            """
+            Timestamp of the element currently being processed or timestamp of a firing timer.
+
+            This might be null, for example if the time characteristic of your program is set to
+            TimeCharacteristic.ProcessingTime.
+            """
+            pass
+
+        @abc.abstractmethod
+        def timer_service(self):

Review comment:
       Add type hint for the result type?

##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -540,3 +540,155 @@ def __init__(self, sink_func: Union[str, JavaObject]):
         :param sink_func: The java SinkFunction object or the full name of the SinkFunction class.
         """
         super(SinkFunction, self).__init__(sink_func)
+
+
+class ProcessFunction(Function):
+    """
+    A function that process elements of a stream.
+
+    For every element in the input stream process_element(value, ctx, out) is invoked. This can
+    produce zero or more elements as output. Implementations can also query the time and set timers
+    through the provided Context. For firing timers on_timer(long, ctx, out) will be invoked. This
+    can again produce zero or more elements as output and register further timers.
+
+    Note that access to keyed state and timers (which are also scoped to a key) is only available if
+    the ProcessFunction is applied on a KeyedStream. Process Function is always a RichFunction.
+    Therefore, access to the RuntimeContext is always available and setup and teardown methods can

Review comment:
       ```suggestion
       
   ```

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/StatefulPythonFunctionOperator.java
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamStatefulPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * {@link StatefulPythonFunctionOperator} is responsible for launching beam runner which will start
+ * a python harness to execute user defined python function. It is also able to handle the timer and
+ * state request from the python stateful user defined function.
+ * */
+public class StatefulPythonFunctionOperator<OUT> extends AbstractOneInputPythonFunctionOperator<Row, OUT>

Review comment:
       Is this operator specific for ProcessFunction? Is so, what about renaming it to PythonProcessFunctionOperator?

##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -540,3 +540,155 @@ def __init__(self, sink_func: Union[str, JavaObject]):
         :param sink_func: The java SinkFunction object or the full name of the SinkFunction class.
         """
         super(SinkFunction, self).__init__(sink_func)
+
+
+class ProcessFunction(Function):
+    """
+    A function that process elements of a stream.
+
+    For every element in the input stream process_element(value, ctx, out) is invoked. This can
+    produce zero or more elements as output. Implementations can also query the time and set timers
+    through the provided Context. For firing timers on_timer(long, ctx, out) will be invoked. This
+    can again produce zero or more elements as output and register further timers.
+
+    Note that access to keyed state and timers (which are also scoped to a key) is only available if
+    the ProcessFunction is applied on a KeyedStream. Process Function is always a RichFunction.
+    Therefore, access to the RuntimeContext is always available and setup and teardown methods can
+    be implemented. See Function.open() and Function.close().
+    """
+
+    @abc.abstractmethod
+    def process_element(self, value, ctx: 'Context', out: 'Collector'):
+        """
+        Process one element from the input stream.
+
+        This function can output zero or more elements using the Collector parameter and also update
+        internal state or set timers using the Context parameter.
+
+        :param value: The input value.
+        :param ctx:  A Context that allows querying the timestamp of the element and getting a
+                     TimerService for registering timers and querying the time. The context is only
+                     valid during the invocation of this method, do not store it.
+        :param out: The collector for returning result values.
+        """
+        pass
+
+    @abc.abstractmethod
+    def on_timer(self, timestamp, ctx: 'OnTimerContext', out: 'Collector'):
+        """
+        Called when a timer set using TimerService fires.
+
+        :param timestamp: The timestamp of the firing timer.
+        :param ctx: An OnTimerContext that allows querying the timestamp of the firing timer,
+                    querying the TimeDomain of the firing timer and getting a TimerService for
+                    registering timers and querying the time. The context is only valid during the
+                    invocation of this method, do not store it.
+        :param out: The collector for returning result values.
+        """
+        pass
+
+    class Context(abc.ABC):
+        """
+        Information available in an invocation of process_element(value, ctx, out) or
+        on_timer(value, ctx, out).
+        """
+
+        @abc.abstractmethod
+        def timestamp(self):
+            """
+            Timestamp of the element currently being processed or timestamp of a firing timer.
+
+            This might be null, for example if the time characteristic of your program is set to

Review comment:
       ```suggestion
               This might be None, for example if the time characteristic of your program is set to
   ```

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/StatefulPythonFunctionOperator.java
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamStatefulPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * {@link StatefulPythonFunctionOperator} is responsible for launching beam runner which will start
+ * a python harness to execute user defined python function. It is also able to handle the timer and
+ * state request from the python stateful user defined function.
+ * */
+public class StatefulPythonFunctionOperator<OUT> extends AbstractOneInputPythonFunctionOperator<Row, OUT>
+	implements ResultTypeQueryable<OUT>, Triggerable<Row, VoidNamespace> {
+
+	private static final Logger LOGGER = LoggerFactory.getLogger(StatefulPythonFunctionOperator.class);
+
+	protected static final String DATA_STREAM_STATEFUL_PYTHON_FUNCTION_URN =
+		"flink:transform:datastream_stateful_function:v1";
+
+	protected static final String DATA_STREAM_STATEFUL_MAP_FUNCTION_CODER_URN =
+		"flink:coder:datastream:stateful_map_function:v1";

Review comment:
       I'm confusing that where is the "map_function" comes from?

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/StatefulPythonFunctionOperator.java
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamStatefulPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * {@link StatefulPythonFunctionOperator} is responsible for launching beam runner which will start
+ * a python harness to execute user defined python function. It is also able to handle the timer and
+ * state request from the python stateful user defined function.
+ * */
+public class StatefulPythonFunctionOperator<OUT> extends AbstractOneInputPythonFunctionOperator<Row, OUT>
+	implements ResultTypeQueryable<OUT>, Triggerable<Row, VoidNamespace> {
+
+	private static final Logger LOGGER = LoggerFactory.getLogger(StatefulPythonFunctionOperator.class);
+
+	protected static final String DATA_STREAM_STATEFUL_PYTHON_FUNCTION_URN =

Review comment:
       private? Need also check the other fields.

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/StatefulPythonFunctionOperator.java
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamStatefulPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * {@link StatefulPythonFunctionOperator} is responsible for launching beam runner which will start
+ * a python harness to execute user defined python function. It is also able to handle the timer and
+ * state request from the python stateful user defined function.
+ * */
+public class StatefulPythonFunctionOperator<OUT> extends AbstractOneInputPythonFunctionOperator<Row, OUT>
+	implements ResultTypeQueryable<OUT>, Triggerable<Row, VoidNamespace> {
+

Review comment:
       Add `private static final long serialVersionUID = 1L;`

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/StatefulPythonFunctionOperator.java
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamStatefulPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * {@link StatefulPythonFunctionOperator} is responsible for launching beam runner which will start
+ * a python harness to execute user defined python function. It is also able to handle the timer and
+ * state request from the python stateful user defined function.
+ * */
+public class StatefulPythonFunctionOperator<OUT> extends AbstractOneInputPythonFunctionOperator<Row, OUT>
+	implements ResultTypeQueryable<OUT>, Triggerable<Row, VoidNamespace> {
+
+	private static final Logger LOGGER = LoggerFactory.getLogger(StatefulPythonFunctionOperator.class);
+
+	protected static final String DATA_STREAM_STATEFUL_PYTHON_FUNCTION_URN =
+		"flink:transform:datastream_stateful_function:v1";
+
+	protected static final String DATA_STREAM_STATEFUL_MAP_FUNCTION_CODER_URN =
+		"flink:coder:datastream:stateful_map_function:v1";
+
+	protected final DataStreamPythonFunctionInfo pythonFunctionInfo;
+	private final TypeInformation runnerInputTypeInfo;
+	private final TypeInformation runnerOutputTypeInfo;
+	private final TypeInformation<OUT> outputTypeInfo;
+	private final TypeInformation<Row> keyTypeInfo;
+	private TypeSerializer runnerInputSerializer;
+	private TypeSerializer runnerOutputSerializer;
+	private TypeSerializer keyTypeSerializer;
+
+	private transient TimerService timerservice;
+
+	protected final Map<String, String> jobOptions;
+
+	protected transient ByteArrayInputStreamWithPos bais;
+
+	protected transient DataInputViewStreamWrapper baisWrapper;
+
+	protected transient ByteArrayOutputStreamWithPos baos;
+
+	protected transient DataOutputViewStreamWrapper baosWrapper;
+
+	protected transient StreamRecordCollector streamRecordCollector;
+
+	protected Row reusableInput;
+	protected Row reusableTimerData;
+
+	public StatefulPythonFunctionOperator(
+		Configuration config,
+		RowTypeInfo inputTypeInfo,
+		TypeInformation<OUT> outputTypeInfo,
+		DataStreamPythonFunctionInfo pythonFunctionInfo) {
+		super(config);
+		this.jobOptions = config.toMap();
+		this.pythonFunctionInfo = pythonFunctionInfo;
+		this.outputTypeInfo = outputTypeInfo;
+		this.keyTypeInfo = new RowTypeInfo(inputTypeInfo.getTypeAt(0));
+		this.keyTypeSerializer = PythonTypeUtils.TypeInfoToSerializerConverter.typeInfoSerializerConverter(keyTypeInfo);
+		// inputType: normal data/ timer data, timerType: proc/event time, currentWatermark, keyData, real data
+		this.runnerInputTypeInfo = Types.ROW(Types.INT, Types.LONG, Types.LONG, this.keyTypeInfo, inputTypeInfo);
+		this.runnerOutputTypeInfo = Types.ROW(Types.INT, Types.LONG, this.keyTypeInfo, outputTypeInfo);
+	}
+
+	@Override
+	public void open() throws Exception {
+		super.open();
+		bais = new ByteArrayInputStreamWithPos();
+		baisWrapper = new DataInputViewStreamWrapper(bais);
+
+		baos = new ByteArrayOutputStreamWithPos();
+		baosWrapper = new DataOutputViewStreamWrapper(baos);
+		runnerInputSerializer = PythonTypeUtils.TypeInfoToSerializerConverter
+			.typeInfoSerializerConverter(runnerInputTypeInfo);
+		runnerOutputSerializer = PythonTypeUtils.TypeInfoToSerializerConverter
+			.typeInfoSerializerConverter(runnerOutputTypeInfo);
+
+		InternalTimerService<VoidNamespace> internalTimerService = getInternalTimerService("user-timers",
+			VoidNamespaceSerializer.INSTANCE, this);
+
+		this.streamRecordCollector = new StreamRecordCollector(output);
+		timerservice = new SimpleTimerService(internalTimerService);
+		reusableInput = new Row(5);
+		reusableTimerData = new Row(5);
+	}
+
+	@Override
+	public TypeInformation<OUT> getProducedType() {
+		return outputTypeInfo;
+	}
+
+	@Override
+	public void onEventTime(InternalTimer<Row, VoidNamespace> timer) throws Exception {
+		processTimer(false, timer);
+		checkInvokeFinishBundleByCount();
+	}
+
+	@Override
+	public void onProcessingTime(InternalTimer<Row, VoidNamespace> timer) throws Exception {
+		processTimer(true, timer);
+		checkInvokeFinishBundleByCount();

Review comment:
       Call emitResults(); after checkInvokeFinishBundleByCount()?

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/StatefulPythonFunctionOperator.java
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamStatefulPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * {@link StatefulPythonFunctionOperator} is responsible for launching beam runner which will start
+ * a python harness to execute user defined python function. It is also able to handle the timer and
+ * state request from the python stateful user defined function.
+ * */
+public class StatefulPythonFunctionOperator<OUT> extends AbstractOneInputPythonFunctionOperator<Row, OUT>
+	implements ResultTypeQueryable<OUT>, Triggerable<Row, VoidNamespace> {
+
+	private static final Logger LOGGER = LoggerFactory.getLogger(StatefulPythonFunctionOperator.class);
+
+	protected static final String DATA_STREAM_STATEFUL_PYTHON_FUNCTION_URN =
+		"flink:transform:datastream_stateful_function:v1";
+
+	protected static final String DATA_STREAM_STATEFUL_MAP_FUNCTION_CODER_URN =
+		"flink:coder:datastream:stateful_map_function:v1";
+
+	protected final DataStreamPythonFunctionInfo pythonFunctionInfo;
+	private final TypeInformation runnerInputTypeInfo;
+	private final TypeInformation runnerOutputTypeInfo;
+	private final TypeInformation<OUT> outputTypeInfo;
+	private final TypeInformation<Row> keyTypeInfo;
+	private TypeSerializer runnerInputSerializer;

Review comment:
       ```suggestion
   	private transient TypeSerializer runnerInputSerializer;
   ```

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamStatefulPythonFunctionRunner.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.runners.python.beam;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.python.metric.FlinkMetricContainer;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+/**
+ * {@link BeamDataStreamStatefulPythonFunctionRunner} is responsible for starting a beam python harness to execute user
+ * defined python function.
+ */
+public class BeamDataStreamStatefulPythonFunctionRunner extends BeamPythonFunctionRunner {
+	private static final long serialVersionUID = 1L;

Review comment:
       could be removed

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/StatefulPythonFunctionOperator.java
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamStatefulPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * {@link StatefulPythonFunctionOperator} is responsible for launching beam runner which will start
+ * a python harness to execute user defined python function. It is also able to handle the timer and
+ * state request from the python stateful user defined function.
+ * */
+public class StatefulPythonFunctionOperator<OUT> extends AbstractOneInputPythonFunctionOperator<Row, OUT>
+	implements ResultTypeQueryable<OUT>, Triggerable<Row, VoidNamespace> {
+
+	private static final Logger LOGGER = LoggerFactory.getLogger(StatefulPythonFunctionOperator.class);
+
+	protected static final String DATA_STREAM_STATEFUL_PYTHON_FUNCTION_URN =
+		"flink:transform:datastream_stateful_function:v1";
+
+	protected static final String DATA_STREAM_STATEFUL_MAP_FUNCTION_CODER_URN =
+		"flink:coder:datastream:stateful_map_function:v1";
+
+	protected final DataStreamPythonFunctionInfo pythonFunctionInfo;
+	private final TypeInformation runnerInputTypeInfo;
+	private final TypeInformation runnerOutputTypeInfo;
+	private final TypeInformation<OUT> outputTypeInfo;
+	private final TypeInformation<Row> keyTypeInfo;
+	private TypeSerializer runnerInputSerializer;
+	private TypeSerializer runnerOutputSerializer;
+	private TypeSerializer keyTypeSerializer;
+
+	private transient TimerService timerservice;
+
+	protected final Map<String, String> jobOptions;
+
+	protected transient ByteArrayInputStreamWithPos bais;
+
+	protected transient DataInputViewStreamWrapper baisWrapper;
+
+	protected transient ByteArrayOutputStreamWithPos baos;
+
+	protected transient DataOutputViewStreamWrapper baosWrapper;
+
+	protected transient StreamRecordCollector streamRecordCollector;
+
+	protected Row reusableInput;
+	protected Row reusableTimerData;
+
+	public StatefulPythonFunctionOperator(
+		Configuration config,
+		RowTypeInfo inputTypeInfo,
+		TypeInformation<OUT> outputTypeInfo,
+		DataStreamPythonFunctionInfo pythonFunctionInfo) {
+		super(config);
+		this.jobOptions = config.toMap();
+		this.pythonFunctionInfo = pythonFunctionInfo;
+		this.outputTypeInfo = outputTypeInfo;
+		this.keyTypeInfo = new RowTypeInfo(inputTypeInfo.getTypeAt(0));
+		this.keyTypeSerializer = PythonTypeUtils.TypeInfoToSerializerConverter.typeInfoSerializerConverter(keyTypeInfo);
+		// inputType: normal data/ timer data, timerType: proc/event time, currentWatermark, keyData, real data
+		this.runnerInputTypeInfo = Types.ROW(Types.INT, Types.LONG, Types.LONG, this.keyTypeInfo, inputTypeInfo);
+		this.runnerOutputTypeInfo = Types.ROW(Types.INT, Types.LONG, this.keyTypeInfo, outputTypeInfo);
+	}
+
+	@Override
+	public void open() throws Exception {
+		super.open();
+		bais = new ByteArrayInputStreamWithPos();
+		baisWrapper = new DataInputViewStreamWrapper(bais);
+
+		baos = new ByteArrayOutputStreamWithPos();
+		baosWrapper = new DataOutputViewStreamWrapper(baos);
+		runnerInputSerializer = PythonTypeUtils.TypeInfoToSerializerConverter
+			.typeInfoSerializerConverter(runnerInputTypeInfo);
+		runnerOutputSerializer = PythonTypeUtils.TypeInfoToSerializerConverter
+			.typeInfoSerializerConverter(runnerOutputTypeInfo);
+
+		InternalTimerService<VoidNamespace> internalTimerService = getInternalTimerService("user-timers",
+			VoidNamespaceSerializer.INSTANCE, this);
+
+		this.streamRecordCollector = new StreamRecordCollector(output);
+		timerservice = new SimpleTimerService(internalTimerService);
+		reusableInput = new Row(5);
+		reusableTimerData = new Row(5);
+	}
+
+	@Override
+	public TypeInformation<OUT> getProducedType() {
+		return outputTypeInfo;
+	}
+
+	@Override
+	public void onEventTime(InternalTimer<Row, VoidNamespace> timer) throws Exception {
+		processTimer(false, timer);
+		checkInvokeFinishBundleByCount();
+	}
+
+	@Override
+	public void onProcessingTime(InternalTimer<Row, VoidNamespace> timer) throws Exception {
+		processTimer(true, timer);
+		checkInvokeFinishBundleByCount();
+	}
+
+	@Override
+	public PythonFunctionRunner createPythonFunctionRunner() throws Exception {
+		return new BeamDataStreamStatefulPythonFunctionRunner(
+			getRuntimeContext().getTaskName(),
+			createPythonEnvironmentManager(),
+			runnerInputTypeInfo,
+			runnerOutputTypeInfo,
+			DATA_STREAM_STATEFUL_PYTHON_FUNCTION_URN,
+			PythonOperatorUtils.getUserDefinedDataStreamStatefulFunctionProto(
+				pythonFunctionInfo, getRuntimeContext(), Collections.EMPTY_MAP, keyTypeInfo),
+			DATA_STREAM_STATEFUL_MAP_FUNCTION_CODER_URN,
+			jobOptions,
+			getFlinkMetricContainer(),
+			getKeyedStateBackend(),
+			keyTypeSerializer,
+			getContainingTask().getEnvironment().getMemoryManager(),
+			getOperatorConfig().getManagedMemoryFractionOperatorUseCaseOfSlot(
+				ManagedMemoryUseCase.PYTHON,
+				getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration(),
+				getContainingTask().getEnvironment().getUserCodeClassLoader().asClassLoader()));
+	}
+
+	@Override
+	public PythonEnv getPythonEnv() {
+		return this.pythonFunctionInfo.getPythonFunction().getPythonEnv();
+	}
+
+	@Override
+	public void emitResult(Tuple2<byte[], Integer> resultTuple) throws Exception {
+		byte[] rawResult = resultTuple.f0;
+		int length = resultTuple.f1;
+		bais.setBuffer(rawResult, 0, length);
+		Row runnerOutput = (Row) runnerOutputSerializer.deserialize(baisWrapper);
+		if (runnerOutput.getField(0) != null) {
+			registerTimer(runnerOutput);
+		} else {
+			streamRecordCollector.collect(runnerOutput.getField(3));
+		}
+
+	}
+
+	@Override
+	public void processElement(StreamRecord<Row> element) throws Exception {
+		LOGGER.info("Current watermark: " + timerservice.currentWatermark());

Review comment:
       Remove this logging. It will output a log for each input element.

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/StatefulPythonFunctionOperator.java
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamStatefulPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * {@link StatefulPythonFunctionOperator} is responsible for launching beam runner which will start
+ * a python harness to execute user defined python function. It is also able to handle the timer and
+ * state request from the python stateful user defined function.
+ * */
+public class StatefulPythonFunctionOperator<OUT> extends AbstractOneInputPythonFunctionOperator<Row, OUT>
+	implements ResultTypeQueryable<OUT>, Triggerable<Row, VoidNamespace> {
+
+	private static final Logger LOGGER = LoggerFactory.getLogger(StatefulPythonFunctionOperator.class);
+
+	protected static final String DATA_STREAM_STATEFUL_PYTHON_FUNCTION_URN =
+		"flink:transform:datastream_stateful_function:v1";
+
+	protected static final String DATA_STREAM_STATEFUL_MAP_FUNCTION_CODER_URN =
+		"flink:coder:datastream:stateful_map_function:v1";
+
+	protected final DataStreamPythonFunctionInfo pythonFunctionInfo;
+	private final TypeInformation runnerInputTypeInfo;
+	private final TypeInformation runnerOutputTypeInfo;
+	private final TypeInformation<OUT> outputTypeInfo;
+	private final TypeInformation<Row> keyTypeInfo;
+	private TypeSerializer runnerInputSerializer;
+	private TypeSerializer runnerOutputSerializer;
+	private TypeSerializer keyTypeSerializer;
+
+	private transient TimerService timerservice;
+
+	protected final Map<String, String> jobOptions;
+
+	protected transient ByteArrayInputStreamWithPos bais;
+
+	protected transient DataInputViewStreamWrapper baisWrapper;
+
+	protected transient ByteArrayOutputStreamWithPos baos;
+
+	protected transient DataOutputViewStreamWrapper baosWrapper;
+
+	protected transient StreamRecordCollector streamRecordCollector;
+
+	protected Row reusableInput;
+	protected Row reusableTimerData;
+
+	public StatefulPythonFunctionOperator(
+		Configuration config,
+		RowTypeInfo inputTypeInfo,
+		TypeInformation<OUT> outputTypeInfo,
+		DataStreamPythonFunctionInfo pythonFunctionInfo) {
+		super(config);
+		this.jobOptions = config.toMap();
+		this.pythonFunctionInfo = pythonFunctionInfo;
+		this.outputTypeInfo = outputTypeInfo;
+		this.keyTypeInfo = new RowTypeInfo(inputTypeInfo.getTypeAt(0));
+		this.keyTypeSerializer = PythonTypeUtils.TypeInfoToSerializerConverter.typeInfoSerializerConverter(keyTypeInfo);

Review comment:
       Could move the initialization of the following fields to the open method, then we can make them transient.

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/StatefulPythonFunctionOperator.java
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamStatefulPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * {@link StatefulPythonFunctionOperator} is responsible for launching beam runner which will start
+ * a python harness to execute user defined python function. It is also able to handle the timer and
+ * state request from the python stateful user defined function.
+ * */
+public class StatefulPythonFunctionOperator<OUT> extends AbstractOneInputPythonFunctionOperator<Row, OUT>
+	implements ResultTypeQueryable<OUT>, Triggerable<Row, VoidNamespace> {
+
+	private static final Logger LOGGER = LoggerFactory.getLogger(StatefulPythonFunctionOperator.class);
+
+	protected static final String DATA_STREAM_STATEFUL_PYTHON_FUNCTION_URN =
+		"flink:transform:datastream_stateful_function:v1";
+
+	protected static final String DATA_STREAM_STATEFUL_MAP_FUNCTION_CODER_URN =
+		"flink:coder:datastream:stateful_map_function:v1";
+
+	protected final DataStreamPythonFunctionInfo pythonFunctionInfo;
+	private final TypeInformation runnerInputTypeInfo;
+	private final TypeInformation runnerOutputTypeInfo;
+	private final TypeInformation<OUT> outputTypeInfo;
+	private final TypeInformation<Row> keyTypeInfo;
+	private TypeSerializer runnerInputSerializer;
+	private TypeSerializer runnerOutputSerializer;

Review comment:
       ```suggestion
   	private transient TypeSerializer runnerOutputSerializer;
   ```

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamStatefulPythonFunctionRunner.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.runners.python.beam;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.python.metric.FlinkMetricContainer;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+/**
+ * {@link BeamDataStreamStatefulPythonFunctionRunner} is responsible for starting a beam python harness to execute user
+ * defined python function.
+ */
+public class BeamDataStreamStatefulPythonFunctionRunner extends BeamPythonFunctionRunner {

Review comment:
       It seems that the only difference between BeamDataStreamStatelessPythonFunctionRunner and BeamDataStreamStatefulPythonFunctionRunner is that the constructor of BeamDataStreamStatefulPythonFunctionRunner has stateBackend and keySerializer. What about unify them as BeamDataStreamPythonFunctionRunner?

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/StatefulPythonFunctionOperator.java
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamStatefulPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * {@link StatefulPythonFunctionOperator} is responsible for launching beam runner which will start
+ * a python harness to execute user defined python function. It is also able to handle the timer and
+ * state request from the python stateful user defined function.
+ * */
+public class StatefulPythonFunctionOperator<OUT> extends AbstractOneInputPythonFunctionOperator<Row, OUT>
+	implements ResultTypeQueryable<OUT>, Triggerable<Row, VoidNamespace> {
+
+	private static final Logger LOGGER = LoggerFactory.getLogger(StatefulPythonFunctionOperator.class);
+
+	protected static final String DATA_STREAM_STATEFUL_PYTHON_FUNCTION_URN =
+		"flink:transform:datastream_stateful_function:v1";
+
+	protected static final String DATA_STREAM_STATEFUL_MAP_FUNCTION_CODER_URN =
+		"flink:coder:datastream:stateful_map_function:v1";
+
+	protected final DataStreamPythonFunctionInfo pythonFunctionInfo;
+	private final TypeInformation runnerInputTypeInfo;
+	private final TypeInformation runnerOutputTypeInfo;
+	private final TypeInformation<OUT> outputTypeInfo;
+	private final TypeInformation<Row> keyTypeInfo;
+	private TypeSerializer runnerInputSerializer;
+	private TypeSerializer runnerOutputSerializer;
+	private TypeSerializer keyTypeSerializer;
+
+	private transient TimerService timerservice;
+
+	protected final Map<String, String> jobOptions;
+
+	protected transient ByteArrayInputStreamWithPos bais;
+
+	protected transient DataInputViewStreamWrapper baisWrapper;
+
+	protected transient ByteArrayOutputStreamWithPos baos;
+
+	protected transient DataOutputViewStreamWrapper baosWrapper;
+
+	protected transient StreamRecordCollector streamRecordCollector;
+
+	protected Row reusableInput;
+	protected Row reusableTimerData;

Review comment:
       ```suggestion
   	private transient Row reusableTimerData;
   ```

##########
File path: flink-python/pyflink/fn_execution/stateful_operation_common.py
##########
@@ -0,0 +1,72 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import time
+
+from typing import Any
+
+from pyflink.datastream.functions import Collector, TimerService
+
+
+class InternalCollector(Collector):
+    """
+    Internal implementation of the Collector. It usa a buffer list to store data to be emitted.

Review comment:
       ```suggestion
       Internal implementation of the Collector. It uses a buffer list to store data to be emitted.
   ```

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamStatefulPythonFunctionRunner.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.runners.python.beam;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.python.metric.FlinkMetricContainer;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+/**
+ * {@link BeamDataStreamStatefulPythonFunctionRunner} is responsible for starting a beam python harness to execute user
+ * defined python function.
+ */
+public class BeamDataStreamStatefulPythonFunctionRunner extends BeamPythonFunctionRunner {

Review comment:
       @Internal

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/StatefulPythonFunctionOperator.java
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamStatefulPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * {@link StatefulPythonFunctionOperator} is responsible for launching beam runner which will start
+ * a python harness to execute user defined python function. It is also able to handle the timer and
+ * state request from the python stateful user defined function.
+ * */
+public class StatefulPythonFunctionOperator<OUT> extends AbstractOneInputPythonFunctionOperator<Row, OUT>
+	implements ResultTypeQueryable<OUT>, Triggerable<Row, VoidNamespace> {
+
+	private static final Logger LOGGER = LoggerFactory.getLogger(StatefulPythonFunctionOperator.class);
+
+	protected static final String DATA_STREAM_STATEFUL_PYTHON_FUNCTION_URN =
+		"flink:transform:datastream_stateful_function:v1";
+
+	protected static final String DATA_STREAM_STATEFUL_MAP_FUNCTION_CODER_URN =
+		"flink:coder:datastream:stateful_map_function:v1";
+
+	protected final DataStreamPythonFunctionInfo pythonFunctionInfo;
+	private final TypeInformation runnerInputTypeInfo;
+	private final TypeInformation runnerOutputTypeInfo;
+	private final TypeInformation<OUT> outputTypeInfo;
+	private final TypeInformation<Row> keyTypeInfo;
+	private TypeSerializer runnerInputSerializer;
+	private TypeSerializer runnerOutputSerializer;
+	private TypeSerializer keyTypeSerializer;
+
+	private transient TimerService timerservice;
+
+	protected final Map<String, String> jobOptions;
+
+	protected transient ByteArrayInputStreamWithPos bais;
+
+	protected transient DataInputViewStreamWrapper baisWrapper;
+
+	protected transient ByteArrayOutputStreamWithPos baos;
+
+	protected transient DataOutputViewStreamWrapper baosWrapper;
+
+	protected transient StreamRecordCollector streamRecordCollector;
+
+	protected Row reusableInput;
+	protected Row reusableTimerData;
+
+	public StatefulPythonFunctionOperator(
+		Configuration config,
+		RowTypeInfo inputTypeInfo,
+		TypeInformation<OUT> outputTypeInfo,
+		DataStreamPythonFunctionInfo pythonFunctionInfo) {
+		super(config);
+		this.jobOptions = config.toMap();
+		this.pythonFunctionInfo = pythonFunctionInfo;
+		this.outputTypeInfo = outputTypeInfo;
+		this.keyTypeInfo = new RowTypeInfo(inputTypeInfo.getTypeAt(0));
+		this.keyTypeSerializer = PythonTypeUtils.TypeInfoToSerializerConverter.typeInfoSerializerConverter(keyTypeInfo);
+		// inputType: normal data/ timer data, timerType: proc/event time, currentWatermark, keyData, real data
+		this.runnerInputTypeInfo = Types.ROW(Types.INT, Types.LONG, Types.LONG, this.keyTypeInfo, inputTypeInfo);
+		this.runnerOutputTypeInfo = Types.ROW(Types.INT, Types.LONG, this.keyTypeInfo, outputTypeInfo);
+	}
+
+	@Override
+	public void open() throws Exception {
+		super.open();
+		bais = new ByteArrayInputStreamWithPos();
+		baisWrapper = new DataInputViewStreamWrapper(bais);
+
+		baos = new ByteArrayOutputStreamWithPos();
+		baosWrapper = new DataOutputViewStreamWrapper(baos);
+		runnerInputSerializer = PythonTypeUtils.TypeInfoToSerializerConverter
+			.typeInfoSerializerConverter(runnerInputTypeInfo);
+		runnerOutputSerializer = PythonTypeUtils.TypeInfoToSerializerConverter
+			.typeInfoSerializerConverter(runnerOutputTypeInfo);
+
+		InternalTimerService<VoidNamespace> internalTimerService = getInternalTimerService("user-timers",
+			VoidNamespaceSerializer.INSTANCE, this);
+
+		this.streamRecordCollector = new StreamRecordCollector(output);
+		timerservice = new SimpleTimerService(internalTimerService);
+		reusableInput = new Row(5);
+		reusableTimerData = new Row(5);
+	}
+
+	@Override
+	public TypeInformation<OUT> getProducedType() {
+		return outputTypeInfo;
+	}
+
+	@Override
+	public void onEventTime(InternalTimer<Row, VoidNamespace> timer) throws Exception {
+		processTimer(false, timer);
+		checkInvokeFinishBundleByCount();

Review comment:
       Move checkInvokeFinishBundleByCount to processTimer?

##########
File path: flink-python/pyflink/fn_execution/stateful_operation_common.py
##########
@@ -0,0 +1,72 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import time
+
+from typing import Any
+
+from pyflink.datastream.functions import Collector, TimerService
+
+
+class InternalCollector(Collector):

Review comment:
       Move these two classes under class DataStreamStatefulFunctionOperation?

##########
File path: flink-python/pyflink/fn_execution/stateful_operation_common.py
##########
@@ -0,0 +1,72 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import time
+
+from typing import Any
+
+from pyflink.datastream.functions import Collector, TimerService
+
+
+class InternalCollector(Collector):
+    """
+    Internal implementation of the Collector. It usa a buffer list to store data to be emitted.
+    There will be a header flag for each data type. 0 means it is a proc time timer registering
+    request, while 1 means it is an event time timer and 2 means it is a normal data. When
+    registering a timer, it must take along the corresponding key for it.
+    """
+    def __init__(self):
+        self.buf = []
+
+    def collect_proc_timer(self, a: Any, key: Any):
+        self.buf.append((0, a, key, None))
+
+    def collect_event_timer(self, a: Any, key: Any):
+        self.buf.append((1, a, key, None))
+
+    def collect_data(self, a: Any):
+        self.buf.append((2, a))
+
+    def collect(self, a: Any):
+        self.collect_data(a)
+
+    def clear(self):
+        self.buf.clear()
+
+
+class InternalTimerService(TimerService):
+    """
+    Internal implementation of TimerService.
+    """
+    def __init__(self, collector, keyed_state_backend):
+        self._collector: InternalCollector = collector

Review comment:
       variable annotation is not supported for Python 3.5




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] shuiqiangchen commented on a change in pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.

Posted by GitBox <gi...@apache.org>.
shuiqiangchen commented on a change in pull request #13803:
URL: https://github.com/apache/flink/pull/13803#discussion_r514027062



##########
File path: flink-python/pyflink/fn_execution/stateful_operation_common.py
##########
@@ -0,0 +1,72 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import time
+
+from typing import Any
+
+from pyflink.datastream.functions import Collector, TimerService
+
+
+class InternalCollector(Collector):
+    """
+    Internal implementation of the Collector. It usa a buffer list to store data to be emitted.
+    There will be a header flag for each data type. 0 means it is a proc time timer registering
+    request, while 1 means it is an event time timer and 2 means it is a normal data. When
+    registering a timer, it must take along the corresponding key for it.
+    """
+    def __init__(self):
+        self.buf = []
+
+    def collect_proc_timer(self, a: Any, key: Any):
+        self.buf.append((0, a, key, None))
+
+    def collect_event_timer(self, a: Any, key: Any):
+        self.buf.append((1, a, key, None))
+
+    def collect_data(self, a: Any):
+        self.buf.append((2, a))
+
+    def collect(self, a: Any):
+        self.collect_data(a)
+
+    def clear(self):
+        self.buf.clear()
+
+
+class InternalTimerService(TimerService):
+    """
+    Internal implementation of TimerService.
+    """
+    def __init__(self, collector, keyed_state_backend):
+        self._collector: InternalCollector = collector
+        self._keyed_state_backend = keyed_state_backend
+        self._current_watermark = None
+
+    def current_processing_time(self) -> int:
+        return int(time.time() * 1000)
+
+    def register_processing_time_timer(self, t: int):
+        current_key = self._keyed_state_backend.get_current_key()
+        self._collector.collect_proc_timer(t, current_key)
+
+    def register_event_time_timer(self, t: int):
+        current_key = self._keyed_state_backend.get_current_key()
+        self._collector.collect_event_timer(t, current_key)
+
+    def current_watermark(self) -> int:
+        return self._current_watermark

Review comment:
       It will be updated in the wrapped process function every time a new data arrived.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] shuiqiangchen commented on a change in pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.

Posted by GitBox <gi...@apache.org>.
shuiqiangchen commented on a change in pull request #13803:
URL: https://github.com/apache/flink/pull/13803#discussion_r514023438



##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamStatefulPythonFunctionRunner.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.runners.python.beam;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.python.metric.FlinkMetricContainer;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+/**
+ * {@link BeamDataStreamStatefulPythonFunctionRunner} is responsible for starting a beam python harness to execute user
+ * defined python function.
+ */
+public class BeamDataStreamStatefulPythonFunctionRunner extends BeamPythonFunctionRunner {

Review comment:
       Yes, they could be unified as BeamDataStreamPythonFunctionRunner.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13803:
URL: https://github.com/apache/flink/pull/13803#issuecomment-717145782


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c88ea42fe4ce5260b492d592ae8f5d79f54ca829",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8378",
       "triggerID" : "c88ea42fe4ce5260b492d592ae8f5d79f54ca829",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8498",
       "triggerID" : "2ab68a5e26b158ab43b128e0d0dbbc62cb8dea17",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a214b00956b850ec3aca2455478353472dd4f44c",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8551",
       "triggerID" : "a214b00956b850ec3aca2455478353472dd4f44c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a214b00956b850ec3aca2455478353472dd4f44c Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8551) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] shuiqiangchen commented on a change in pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.

Posted by GitBox <gi...@apache.org>.
shuiqiangchen commented on a change in pull request #13803:
URL: https://github.com/apache/flink/pull/13803#discussion_r514019677



##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/StatefulPythonFunctionOperator.java
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamStatefulPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * {@link StatefulPythonFunctionOperator} is responsible for launching beam runner which will start
+ * a python harness to execute user defined python function. It is also able to handle the timer and
+ * state request from the python stateful user defined function.
+ * */
+public class StatefulPythonFunctionOperator<OUT> extends AbstractOneInputPythonFunctionOperator<Row, OUT>
+	implements ResultTypeQueryable<OUT>, Triggerable<Row, VoidNamespace> {
+
+	private static final Logger LOGGER = LoggerFactory.getLogger(StatefulPythonFunctionOperator.class);
+
+	protected static final String DATA_STREAM_STATEFUL_PYTHON_FUNCTION_URN =
+		"flink:transform:datastream_stateful_function:v1";
+
+	protected static final String DATA_STREAM_STATEFUL_MAP_FUNCTION_CODER_URN =
+		"flink:coder:datastream:stateful_map_function:v1";
+
+	protected final DataStreamPythonFunctionInfo pythonFunctionInfo;
+	private final TypeInformation runnerInputTypeInfo;
+	private final TypeInformation runnerOutputTypeInfo;
+	private final TypeInformation<OUT> outputTypeInfo;
+	private final TypeInformation<Row> keyTypeInfo;
+	private TypeSerializer runnerInputSerializer;
+	private TypeSerializer runnerOutputSerializer;
+	private TypeSerializer keyTypeSerializer;
+
+	private transient TimerService timerservice;
+
+	protected final Map<String, String> jobOptions;
+
+	protected transient ByteArrayInputStreamWithPos bais;
+
+	protected transient DataInputViewStreamWrapper baisWrapper;
+
+	protected transient ByteArrayOutputStreamWithPos baos;
+
+	protected transient DataOutputViewStreamWrapper baosWrapper;
+
+	protected transient StreamRecordCollector streamRecordCollector;
+
+	protected Row reusableInput;
+	protected Row reusableTimerData;
+
+	public StatefulPythonFunctionOperator(
+		Configuration config,
+		RowTypeInfo inputTypeInfo,
+		TypeInformation<OUT> outputTypeInfo,
+		DataStreamPythonFunctionInfo pythonFunctionInfo) {
+		super(config);
+		this.jobOptions = config.toMap();
+		this.pythonFunctionInfo = pythonFunctionInfo;
+		this.outputTypeInfo = outputTypeInfo;
+		this.keyTypeInfo = new RowTypeInfo(inputTypeInfo.getTypeAt(0));
+		this.keyTypeSerializer = PythonTypeUtils.TypeInfoToSerializerConverter.typeInfoSerializerConverter(keyTypeInfo);
+		// inputType: normal data/ timer data, timerType: proc/event time, currentWatermark, keyData, real data
+		this.runnerInputTypeInfo = Types.ROW(Types.INT, Types.LONG, Types.LONG, this.keyTypeInfo, inputTypeInfo);
+		this.runnerOutputTypeInfo = Types.ROW(Types.INT, Types.LONG, this.keyTypeInfo, outputTypeInfo);
+	}
+
+	@Override
+	public void open() throws Exception {
+		super.open();
+		bais = new ByteArrayInputStreamWithPos();
+		baisWrapper = new DataInputViewStreamWrapper(bais);
+
+		baos = new ByteArrayOutputStreamWithPos();
+		baosWrapper = new DataOutputViewStreamWrapper(baos);
+		runnerInputSerializer = PythonTypeUtils.TypeInfoToSerializerConverter
+			.typeInfoSerializerConverter(runnerInputTypeInfo);
+		runnerOutputSerializer = PythonTypeUtils.TypeInfoToSerializerConverter
+			.typeInfoSerializerConverter(runnerOutputTypeInfo);
+
+		InternalTimerService<VoidNamespace> internalTimerService = getInternalTimerService("user-timers",
+			VoidNamespaceSerializer.INSTANCE, this);
+
+		this.streamRecordCollector = new StreamRecordCollector(output);
+		timerservice = new SimpleTimerService(internalTimerService);
+		reusableInput = new Row(5);
+		reusableTimerData = new Row(5);
+	}
+
+	@Override
+	public TypeInformation<OUT> getProducedType() {
+		return outputTypeInfo;
+	}
+
+	@Override
+	public void onEventTime(InternalTimer<Row, VoidNamespace> timer) throws Exception {
+		processTimer(false, timer);
+		checkInvokeFinishBundleByCount();
+	}
+
+	@Override
+	public void onProcessingTime(InternalTimer<Row, VoidNamespace> timer) throws Exception {
+		processTimer(true, timer);
+		checkInvokeFinishBundleByCount();
+	}
+
+	@Override
+	public PythonFunctionRunner createPythonFunctionRunner() throws Exception {
+		return new BeamDataStreamStatefulPythonFunctionRunner(
+			getRuntimeContext().getTaskName(),
+			createPythonEnvironmentManager(),
+			runnerInputTypeInfo,
+			runnerOutputTypeInfo,
+			DATA_STREAM_STATEFUL_PYTHON_FUNCTION_URN,
+			PythonOperatorUtils.getUserDefinedDataStreamStatefulFunctionProto(
+				pythonFunctionInfo, getRuntimeContext(), Collections.EMPTY_MAP, keyTypeInfo),
+			DATA_STREAM_STATEFUL_MAP_FUNCTION_CODER_URN,
+			jobOptions,
+			getFlinkMetricContainer(),
+			getKeyedStateBackend(),
+			keyTypeSerializer,
+			getContainingTask().getEnvironment().getMemoryManager(),
+			getOperatorConfig().getManagedMemoryFractionOperatorUseCaseOfSlot(
+				ManagedMemoryUseCase.PYTHON,
+				getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration(),
+				getContainingTask().getEnvironment().getUserCodeClassLoader().asClassLoader()));
+	}
+
+	@Override
+	public PythonEnv getPythonEnv() {
+		return this.pythonFunctionInfo.getPythonFunction().getPythonEnv();
+	}
+
+	@Override
+	public void emitResult(Tuple2<byte[], Integer> resultTuple) throws Exception {
+		byte[] rawResult = resultTuple.f0;
+		int length = resultTuple.f1;
+		bais.setBuffer(rawResult, 0, length);
+		Row runnerOutput = (Row) runnerOutputSerializer.deserialize(baisWrapper);
+		if (runnerOutput.getField(0) != null) {
+			registerTimer(runnerOutput);
+		} else {
+			streamRecordCollector.collect(runnerOutput.getField(3));
+		}
+
+	}
+
+	@Override
+	public void processElement(StreamRecord<Row> element) throws Exception {
+		LOGGER.info("Current watermark: " + timerservice.currentWatermark());

Review comment:
       Sorry, it is for debug purpose and should be removed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] shuiqiangchen commented on a change in pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.

Posted by GitBox <gi...@apache.org>.
shuiqiangchen commented on a change in pull request #13803:
URL: https://github.com/apache/flink/pull/13803#discussion_r517816205



##########
File path: flink-python/pyflink/datastream/time_domain.py
##########
@@ -0,0 +1,42 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from enum import Enum
+
+from pyflink.java_gateway import get_gateway
+
+
+class TimeDomain(Enum):
+    """
+    TimeDomain specifies whether a firing timer is based on event time or processing time.
+
+    EVENT_TIME: Time is based on timestamp of events.
+    PROCESSING_TIME: Time is based on the current processing-time of a machine where processing
+                     happens.
+    """
+
+    EVENT_TIME = 0
+    PROCESSING_TIME = 1
+
+    @staticmethod

Review comment:
       Yes, they have not been called anywhere.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13803:
URL: https://github.com/apache/flink/pull/13803#issuecomment-717021527


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit c88ea42fe4ce5260b492d592ae8f5d79f54ca829 (Tue Oct 27 06:39:53 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
    * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-19821).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work.
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org