You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/29 03:26:11 UTC

[GitHub] [flink] dianfu commented on a change in pull request #13803: [FLINK-19821][python] Add ProcessFunction and timer access for Python DataStream API.

dianfu commented on a change in pull request #13803:
URL: https://github.com/apache/flink/pull/13803#discussion_r513882412



##########
File path: flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_timer_job.py
##########
@@ -0,0 +1,89 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from pyflink.common.serialization import SimpleStringSchema
+from pyflink.common.typeinfo import Types
+from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
+from pyflink.datastream.connectors import FlinkKafkaProducer
+from pyflink.datastream.functions import ProcessFunction, Collector
+from pyflink.table import StreamTableEnvironment
+
+
+def test_ds_timer():
+    env = StreamExecutionEnvironment.get_execution_environment()
+    env.set_parallelism(1)
+    env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
+
+    t_env = StreamTableEnvironment.create(stream_execution_environment=env)
+    t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed",

Review comment:
       this is not necessary any more

##########
File path: flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_timer_job.py
##########
@@ -0,0 +1,89 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from pyflink.common.serialization import SimpleStringSchema
+from pyflink.common.typeinfo import Types
+from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
+from pyflink.datastream.connectors import FlinkKafkaProducer
+from pyflink.datastream.functions import ProcessFunction, Collector
+from pyflink.table import StreamTableEnvironment
+
+
+def test_ds_timer():
+    env = StreamExecutionEnvironment.get_execution_environment()
+    env.set_parallelism(1)

Review comment:
       Why set the parallelism to 1?

##########
File path: flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_timer_job.py
##########
@@ -0,0 +1,89 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from pyflink.common.serialization import SimpleStringSchema
+from pyflink.common.typeinfo import Types
+from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
+from pyflink.datastream.connectors import FlinkKafkaProducer
+from pyflink.datastream.functions import ProcessFunction, Collector
+from pyflink.table import StreamTableEnvironment
+
+
+def test_ds_timer():

Review comment:
       Could we merge this test case into the existing data stream test instead of adding another one test case?

##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -540,3 +540,155 @@ def __init__(self, sink_func: Union[str, JavaObject]):
         :param sink_func: The java SinkFunction object or the full name of the SinkFunction class.
         """
         super(SinkFunction, self).__init__(sink_func)
+
+
+class ProcessFunction(Function):
+    """
+    A function that process elements of a stream.
+
+    For every element in the input stream process_element(value, ctx, out) is invoked. This can
+    produce zero or more elements as output. Implementations can also query the time and set timers
+    through the provided Context. For firing timers on_timer(long, ctx, out) will be invoked. This
+    can again produce zero or more elements as output and register further timers.
+
+    Note that access to keyed state and timers (which are also scoped to a key) is only available if
+    the ProcessFunction is applied on a KeyedStream. Process Function is always a RichFunction.

Review comment:
       ```suggestion
       the ProcessFunction is applied on a KeyedStream.
   ```
   Unnecessary comments as all the Python functions are rich, not only ProcessFunction.

##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -540,3 +540,155 @@ def __init__(self, sink_func: Union[str, JavaObject]):
         :param sink_func: The java SinkFunction object or the full name of the SinkFunction class.
         """
         super(SinkFunction, self).__init__(sink_func)
+
+
+class ProcessFunction(Function):
+    """
+    A function that process elements of a stream.
+
+    For every element in the input stream process_element(value, ctx, out) is invoked. This can
+    produce zero or more elements as output. Implementations can also query the time and set timers
+    through the provided Context. For firing timers on_timer(long, ctx, out) will be invoked. This
+    can again produce zero or more elements as output and register further timers.
+
+    Note that access to keyed state and timers (which are also scoped to a key) is only available if
+    the ProcessFunction is applied on a KeyedStream. Process Function is always a RichFunction.
+    Therefore, access to the RuntimeContext is always available and setup and teardown methods can
+    be implemented. See Function.open() and Function.close().

Review comment:
       ```suggestion
       
   ```

##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -540,3 +540,155 @@ def __init__(self, sink_func: Union[str, JavaObject]):
         :param sink_func: The java SinkFunction object or the full name of the SinkFunction class.
         """
         super(SinkFunction, self).__init__(sink_func)
+
+
+class ProcessFunction(Function):
+    """
+    A function that process elements of a stream.
+
+    For every element in the input stream process_element(value, ctx, out) is invoked. This can
+    produce zero or more elements as output. Implementations can also query the time and set timers
+    through the provided Context. For firing timers on_timer(long, ctx, out) will be invoked. This
+    can again produce zero or more elements as output and register further timers.
+
+    Note that access to keyed state and timers (which are also scoped to a key) is only available if
+    the ProcessFunction is applied on a KeyedStream. Process Function is always a RichFunction.
+    Therefore, access to the RuntimeContext is always available and setup and teardown methods can
+    be implemented. See Function.open() and Function.close().
+    """
+
+    @abc.abstractmethod
+    def process_element(self, value, ctx: 'Context', out: 'Collector'):
+        """
+        Process one element from the input stream.
+
+        This function can output zero or more elements using the Collector parameter and also update
+        internal state or set timers using the Context parameter.
+
+        :param value: The input value.
+        :param ctx:  A Context that allows querying the timestamp of the element and getting a
+                     TimerService for registering timers and querying the time. The context is only
+                     valid during the invocation of this method, do not store it.
+        :param out: The collector for returning result values.
+        """
+        pass
+
+    @abc.abstractmethod
+    def on_timer(self, timestamp, ctx: 'OnTimerContext', out: 'Collector'):
+        """
+        Called when a timer set using TimerService fires.
+
+        :param timestamp: The timestamp of the firing timer.
+        :param ctx: An OnTimerContext that allows querying the timestamp of the firing timer,
+                    querying the TimeDomain of the firing timer and getting a TimerService for
+                    registering timers and querying the time. The context is only valid during the
+                    invocation of this method, do not store it.
+        :param out: The collector for returning result values.
+        """
+        pass
+
+    class Context(abc.ABC):
+        """
+        Information available in an invocation of process_element(value, ctx, out) or
+        on_timer(value, ctx, out).
+        """
+
+        @abc.abstractmethod
+        def timestamp(self):
+            """
+            Timestamp of the element currently being processed or timestamp of a firing timer.
+
+            This might be null, for example if the time characteristic of your program is set to
+            TimeCharacteristic.ProcessingTime.
+            """
+            pass
+
+        @abc.abstractmethod
+        def timer_service(self):
+            """
+            A Timer service for querying time and registering timers.
+            """
+            pass
+
+    class OnTimerContext(abc.ABC):
+        """
+        Information available in an invocation of on_timer(long, OnTimerContext, Collector)
+        """
+        @abc.abstractmethod
+        def time_domain(self):

Review comment:
       ditto

##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -540,3 +540,155 @@ def __init__(self, sink_func: Union[str, JavaObject]):
         :param sink_func: The java SinkFunction object or the full name of the SinkFunction class.
         """
         super(SinkFunction, self).__init__(sink_func)
+
+
+class ProcessFunction(Function):
+    """
+    A function that process elements of a stream.
+
+    For every element in the input stream process_element(value, ctx, out) is invoked. This can
+    produce zero or more elements as output. Implementations can also query the time and set timers
+    through the provided Context. For firing timers on_timer(long, ctx, out) will be invoked. This
+    can again produce zero or more elements as output and register further timers.
+
+    Note that access to keyed state and timers (which are also scoped to a key) is only available if
+    the ProcessFunction is applied on a KeyedStream. Process Function is always a RichFunction.
+    Therefore, access to the RuntimeContext is always available and setup and teardown methods can
+    be implemented. See Function.open() and Function.close().
+    """
+
+    @abc.abstractmethod
+    def process_element(self, value, ctx: 'Context', out: 'Collector'):
+        """
+        Process one element from the input stream.
+
+        This function can output zero or more elements using the Collector parameter and also update
+        internal state or set timers using the Context parameter.
+
+        :param value: The input value.
+        :param ctx:  A Context that allows querying the timestamp of the element and getting a
+                     TimerService for registering timers and querying the time. The context is only
+                     valid during the invocation of this method, do not store it.
+        :param out: The collector for returning result values.
+        """
+        pass
+
+    @abc.abstractmethod
+    def on_timer(self, timestamp, ctx: 'OnTimerContext', out: 'Collector'):
+        """
+        Called when a timer set using TimerService fires.
+
+        :param timestamp: The timestamp of the firing timer.
+        :param ctx: An OnTimerContext that allows querying the timestamp of the firing timer,
+                    querying the TimeDomain of the firing timer and getting a TimerService for
+                    registering timers and querying the time. The context is only valid during the
+                    invocation of this method, do not store it.
+        :param out: The collector for returning result values.
+        """
+        pass
+
+    class Context(abc.ABC):
+        """
+        Information available in an invocation of process_element(value, ctx, out) or
+        on_timer(value, ctx, out).
+        """
+
+        @abc.abstractmethod
+        def timestamp(self):
+            """
+            Timestamp of the element currently being processed or timestamp of a firing timer.
+
+            This might be null, for example if the time characteristic of your program is set to
+            TimeCharacteristic.ProcessingTime.
+            """
+            pass
+
+        @abc.abstractmethod
+        def timer_service(self):
+            """
+            A Timer service for querying time and registering timers.
+            """
+            pass
+
+    class OnTimerContext(abc.ABC):
+        """
+        Information available in an invocation of on_timer(long, OnTimerContext, Collector)
+        """
+        @abc.abstractmethod
+        def time_domain(self):
+            pass
+
+
+class Collector(abc.ABC):
+    """
+    Collects a record and forwards it.
+    """
+    @abc.abstractmethod
+    def collect(self, value):
+        """
+        Emits a record.
+
+        :param value: The record to collect.
+        """
+        pass
+
+
+class TimerService(abc.ABC):
+    """
+    Interface for working with time and timers.
+    """
+
+    @abc.abstractmethod
+    def current_processing_time(self):
+        """
+        Returns the current processing time.
+        """
+        pass
+
+    @abc.abstractmethod
+    def current_watermark(self):
+        """
+        Returns the current event-time watermark.
+        """
+        pass
+
+    @abc.abstractmethod
+    def register_processing_time_timer(self, time: int):
+        """
+        Registers a timer to be fired when processing time passes the given time.
+
+        Timers can internally be scoped to keys and/or windows. When you set a timer in a keyed
+        context, such as in an operation on KeyedStream then that context will so be active when you
+        receive the timer notification.
+
+        :param time: The processing time of the timer to be registered.
+        """
+        pass
+
+    @abc.abstractmethod
+    def register_event_time_timer(self, time: int):
+        """
+        Registers a timer tobe fired when the event time watermark passes the given time.
+
+        Timers can internally be scoped to keys and/or windows. When you set a timer in a keyed
+        context, such as in an operation on KeyedStream then that context will so be active when you
+        receive the timer notification.
+
+        :param time: The event time of the timer to be registered.
+        """
+        pass
+
+
+class InternalProcessFunctionContext(ProcessFunction.Context):
+    """
+    Internal implementation of ProcessFunction.Context.
+    """
+
+    def __init__(self, timer_service: 'TimerService'):
+        self._timer_service = timer_service
+
+    def timestamp(self):
+        pass

Review comment:
       This API is still not available? If so, we can remove it for now.

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/StatefulPythonFunctionOperator.java
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamStatefulPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * {@link StatefulPythonFunctionOperator} is responsible for launching beam runner which will start
+ * a python harness to execute user defined python function. It is also able to handle the timer and
+ * state request from the python stateful user defined function.
+ * */
+public class StatefulPythonFunctionOperator<OUT> extends AbstractOneInputPythonFunctionOperator<Row, OUT>

Review comment:
       Add @Internal

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/StatefulPythonFunctionOperator.java
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamStatefulPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * {@link StatefulPythonFunctionOperator} is responsible for launching beam runner which will start
+ * a python harness to execute user defined python function. It is also able to handle the timer and
+ * state request from the python stateful user defined function.
+ * */
+public class StatefulPythonFunctionOperator<OUT> extends AbstractOneInputPythonFunctionOperator<Row, OUT>
+	implements ResultTypeQueryable<OUT>, Triggerable<Row, VoidNamespace> {
+
+	private static final Logger LOGGER = LoggerFactory.getLogger(StatefulPythonFunctionOperator.class);
+
+	protected static final String DATA_STREAM_STATEFUL_PYTHON_FUNCTION_URN =
+		"flink:transform:datastream_stateful_function:v1";
+
+	protected static final String DATA_STREAM_STATEFUL_MAP_FUNCTION_CODER_URN =
+		"flink:coder:datastream:stateful_map_function:v1";
+
+	protected final DataStreamPythonFunctionInfo pythonFunctionInfo;

Review comment:
       Add Java doc for the fields and methods. Currently there is no Java doc at all in this class.

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/StatefulPythonFunctionOperator.java
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamStatefulPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * {@link StatefulPythonFunctionOperator} is responsible for launching beam runner which will start
+ * a python harness to execute user defined python function. It is also able to handle the timer and
+ * state request from the python stateful user defined function.
+ * */
+public class StatefulPythonFunctionOperator<OUT> extends AbstractOneInputPythonFunctionOperator<Row, OUT>
+	implements ResultTypeQueryable<OUT>, Triggerable<Row, VoidNamespace> {
+
+	private static final Logger LOGGER = LoggerFactory.getLogger(StatefulPythonFunctionOperator.class);
+
+	protected static final String DATA_STREAM_STATEFUL_PYTHON_FUNCTION_URN =
+		"flink:transform:datastream_stateful_function:v1";
+
+	protected static final String DATA_STREAM_STATEFUL_MAP_FUNCTION_CODER_URN =
+		"flink:coder:datastream:stateful_map_function:v1";
+
+	protected final DataStreamPythonFunctionInfo pythonFunctionInfo;
+	private final TypeInformation runnerInputTypeInfo;
+	private final TypeInformation runnerOutputTypeInfo;
+	private final TypeInformation<OUT> outputTypeInfo;
+	private final TypeInformation<Row> keyTypeInfo;
+	private TypeSerializer runnerInputSerializer;
+	private TypeSerializer runnerOutputSerializer;
+	private TypeSerializer keyTypeSerializer;
+
+	private transient TimerService timerservice;
+
+	protected final Map<String, String> jobOptions;
+
+	protected transient ByteArrayInputStreamWithPos bais;
+
+	protected transient DataInputViewStreamWrapper baisWrapper;
+
+	protected transient ByteArrayOutputStreamWithPos baos;
+
+	protected transient DataOutputViewStreamWrapper baosWrapper;
+
+	protected transient StreamRecordCollector streamRecordCollector;
+
+	protected Row reusableInput;

Review comment:
       ```suggestion
   	private transient  Row reusableInput;
   ```

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonOperatorUtils.java
##########
@@ -132,4 +133,22 @@
 			dataStreamPythonFunctionInfo.getPythonFunction().getSerializedPythonFunction()));
 		return builder.build();
 	}
+
+	public static FlinkFnApi.UserDefinedDataStreamFunction
+	getUserDefinedDataStreamStatefulFunctionProto(
+		DataStreamPythonFunctionInfo dataStreamPythonFunctionInfo,
+		RuntimeContext runtimeContext,
+		Map<String, String> internalParameters,
+		TypeInformation keyTypeInfo) {
+		FlinkFnApi.UserDefinedDataStreamFunction userDefinedDataStreamFunction =
+			getUserDefinedDataStreamFunctionProto(dataStreamPythonFunctionInfo, runtimeContext,
+				internalParameters);
+		FlinkFnApi.TypeInfo.FieldType builtKeyFieldType = PythonTypeUtils.TypeInfoToProtoConverter
+			.getFieldType(keyTypeInfo);
+		return userDefinedDataStreamFunction.toBuilder()
+			.setKeyTypeInfo(PythonTypeUtils.TypeInfoToProtoConverter
+				.toTypeInfoProto(builtKeyFieldType)).build();
+

Review comment:
       unnecessary empty line

##########
File path: flink-python/pyflink/fn_execution/stateful_operation_common.py
##########
@@ -0,0 +1,72 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import time
+
+from typing import Any
+
+from pyflink.datastream.functions import Collector, TimerService
+
+
+class InternalCollector(Collector):
+    """
+    Internal implementation of the Collector. It usa a buffer list to store data to be emitted.
+    There will be a header flag for each data type. 0 means it is a proc time timer registering
+    request, while 1 means it is an event time timer and 2 means it is a normal data. When
+    registering a timer, it must take along the corresponding key for it.
+    """
+    def __init__(self):
+        self.buf = []
+
+    def collect_proc_timer(self, a: Any, key: Any):
+        self.buf.append((0, a, key, None))
+
+    def collect_event_timer(self, a: Any, key: Any):
+        self.buf.append((1, a, key, None))
+
+    def collect_data(self, a: Any):
+        self.buf.append((2, a))
+
+    def collect(self, a: Any):
+        self.collect_data(a)
+
+    def clear(self):
+        self.buf.clear()
+
+
+class InternalTimerService(TimerService):
+    """
+    Internal implementation of TimerService.
+    """
+    def __init__(self, collector, keyed_state_backend):
+        self._collector: InternalCollector = collector
+        self._keyed_state_backend = keyed_state_backend
+        self._current_watermark = None
+
+    def current_processing_time(self) -> int:
+        return int(time.time() * 1000)
+
+    def register_processing_time_timer(self, t: int):
+        current_key = self._keyed_state_backend.get_current_key()
+        self._collector.collect_proc_timer(t, current_key)
+
+    def register_event_time_timer(self, t: int):
+        current_key = self._keyed_state_backend.get_current_key()
+        self._collector.collect_event_timer(t, current_key)
+
+    def current_watermark(self) -> int:
+        return self._current_watermark

Review comment:
       watermark is always None?

##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -540,3 +540,155 @@ def __init__(self, sink_func: Union[str, JavaObject]):
         :param sink_func: The java SinkFunction object or the full name of the SinkFunction class.
         """
         super(SinkFunction, self).__init__(sink_func)
+
+
+class ProcessFunction(Function):
+    """
+    A function that process elements of a stream.
+
+    For every element in the input stream process_element(value, ctx, out) is invoked. This can
+    produce zero or more elements as output. Implementations can also query the time and set timers
+    through the provided Context. For firing timers on_timer(long, ctx, out) will be invoked. This
+    can again produce zero or more elements as output and register further timers.
+
+    Note that access to keyed state and timers (which are also scoped to a key) is only available if
+    the ProcessFunction is applied on a KeyedStream. Process Function is always a RichFunction.
+    Therefore, access to the RuntimeContext is always available and setup and teardown methods can
+    be implemented. See Function.open() and Function.close().
+    """
+
+    @abc.abstractmethod
+    def process_element(self, value, ctx: 'Context', out: 'Collector'):
+        """
+        Process one element from the input stream.
+
+        This function can output zero or more elements using the Collector parameter and also update
+        internal state or set timers using the Context parameter.
+
+        :param value: The input value.
+        :param ctx:  A Context that allows querying the timestamp of the element and getting a
+                     TimerService for registering timers and querying the time. The context is only
+                     valid during the invocation of this method, do not store it.
+        :param out: The collector for returning result values.
+        """
+        pass
+
+    @abc.abstractmethod
+    def on_timer(self, timestamp, ctx: 'OnTimerContext', out: 'Collector'):
+        """
+        Called when a timer set using TimerService fires.
+
+        :param timestamp: The timestamp of the firing timer.
+        :param ctx: An OnTimerContext that allows querying the timestamp of the firing timer,
+                    querying the TimeDomain of the firing timer and getting a TimerService for
+                    registering timers and querying the time. The context is only valid during the
+                    invocation of this method, do not store it.
+        :param out: The collector for returning result values.
+        """
+        pass
+
+    class Context(abc.ABC):
+        """
+        Information available in an invocation of process_element(value, ctx, out) or
+        on_timer(value, ctx, out).
+        """
+
+        @abc.abstractmethod
+        def timestamp(self):

Review comment:
       add type hint for the result type?

##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -540,3 +540,155 @@ def __init__(self, sink_func: Union[str, JavaObject]):
         :param sink_func: The java SinkFunction object or the full name of the SinkFunction class.
         """
         super(SinkFunction, self).__init__(sink_func)
+
+
+class ProcessFunction(Function):
+    """
+    A function that process elements of a stream.
+
+    For every element in the input stream process_element(value, ctx, out) is invoked. This can
+    produce zero or more elements as output. Implementations can also query the time and set timers
+    through the provided Context. For firing timers on_timer(long, ctx, out) will be invoked. This
+    can again produce zero or more elements as output and register further timers.
+
+    Note that access to keyed state and timers (which are also scoped to a key) is only available if
+    the ProcessFunction is applied on a KeyedStream. Process Function is always a RichFunction.
+    Therefore, access to the RuntimeContext is always available and setup and teardown methods can
+    be implemented. See Function.open() and Function.close().
+    """
+
+    @abc.abstractmethod
+    def process_element(self, value, ctx: 'Context', out: 'Collector'):
+        """
+        Process one element from the input stream.
+
+        This function can output zero or more elements using the Collector parameter and also update
+        internal state or set timers using the Context parameter.
+
+        :param value: The input value.
+        :param ctx:  A Context that allows querying the timestamp of the element and getting a
+                     TimerService for registering timers and querying the time. The context is only
+                     valid during the invocation of this method, do not store it.
+        :param out: The collector for returning result values.
+        """
+        pass
+
+    @abc.abstractmethod
+    def on_timer(self, timestamp, ctx: 'OnTimerContext', out: 'Collector'):
+        """
+        Called when a timer set using TimerService fires.
+
+        :param timestamp: The timestamp of the firing timer.
+        :param ctx: An OnTimerContext that allows querying the timestamp of the firing timer,
+                    querying the TimeDomain of the firing timer and getting a TimerService for
+                    registering timers and querying the time. The context is only valid during the
+                    invocation of this method, do not store it.
+        :param out: The collector for returning result values.
+        """
+        pass
+
+    class Context(abc.ABC):
+        """
+        Information available in an invocation of process_element(value, ctx, out) or
+        on_timer(value, ctx, out).
+        """
+
+        @abc.abstractmethod
+        def timestamp(self):
+            """
+            Timestamp of the element currently being processed or timestamp of a firing timer.
+
+            This might be null, for example if the time characteristic of your program is set to
+            TimeCharacteristic.ProcessingTime.
+            """
+            pass
+
+        @abc.abstractmethod
+        def timer_service(self):

Review comment:
       Add type hint for the result type?

##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -540,3 +540,155 @@ def __init__(self, sink_func: Union[str, JavaObject]):
         :param sink_func: The java SinkFunction object or the full name of the SinkFunction class.
         """
         super(SinkFunction, self).__init__(sink_func)
+
+
+class ProcessFunction(Function):
+    """
+    A function that process elements of a stream.
+
+    For every element in the input stream process_element(value, ctx, out) is invoked. This can
+    produce zero or more elements as output. Implementations can also query the time and set timers
+    through the provided Context. For firing timers on_timer(long, ctx, out) will be invoked. This
+    can again produce zero or more elements as output and register further timers.
+
+    Note that access to keyed state and timers (which are also scoped to a key) is only available if
+    the ProcessFunction is applied on a KeyedStream. Process Function is always a RichFunction.
+    Therefore, access to the RuntimeContext is always available and setup and teardown methods can

Review comment:
       ```suggestion
       
   ```

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/StatefulPythonFunctionOperator.java
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamStatefulPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * {@link StatefulPythonFunctionOperator} is responsible for launching beam runner which will start
+ * a python harness to execute user defined python function. It is also able to handle the timer and
+ * state request from the python stateful user defined function.
+ * */
+public class StatefulPythonFunctionOperator<OUT> extends AbstractOneInputPythonFunctionOperator<Row, OUT>

Review comment:
       Is this operator specific for ProcessFunction? Is so, what about renaming it to PythonProcessFunctionOperator?

##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -540,3 +540,155 @@ def __init__(self, sink_func: Union[str, JavaObject]):
         :param sink_func: The java SinkFunction object or the full name of the SinkFunction class.
         """
         super(SinkFunction, self).__init__(sink_func)
+
+
+class ProcessFunction(Function):
+    """
+    A function that process elements of a stream.
+
+    For every element in the input stream process_element(value, ctx, out) is invoked. This can
+    produce zero or more elements as output. Implementations can also query the time and set timers
+    through the provided Context. For firing timers on_timer(long, ctx, out) will be invoked. This
+    can again produce zero or more elements as output and register further timers.
+
+    Note that access to keyed state and timers (which are also scoped to a key) is only available if
+    the ProcessFunction is applied on a KeyedStream. Process Function is always a RichFunction.
+    Therefore, access to the RuntimeContext is always available and setup and teardown methods can
+    be implemented. See Function.open() and Function.close().
+    """
+
+    @abc.abstractmethod
+    def process_element(self, value, ctx: 'Context', out: 'Collector'):
+        """
+        Process one element from the input stream.
+
+        This function can output zero or more elements using the Collector parameter and also update
+        internal state or set timers using the Context parameter.
+
+        :param value: The input value.
+        :param ctx:  A Context that allows querying the timestamp of the element and getting a
+                     TimerService for registering timers and querying the time. The context is only
+                     valid during the invocation of this method, do not store it.
+        :param out: The collector for returning result values.
+        """
+        pass
+
+    @abc.abstractmethod
+    def on_timer(self, timestamp, ctx: 'OnTimerContext', out: 'Collector'):
+        """
+        Called when a timer set using TimerService fires.
+
+        :param timestamp: The timestamp of the firing timer.
+        :param ctx: An OnTimerContext that allows querying the timestamp of the firing timer,
+                    querying the TimeDomain of the firing timer and getting a TimerService for
+                    registering timers and querying the time. The context is only valid during the
+                    invocation of this method, do not store it.
+        :param out: The collector for returning result values.
+        """
+        pass
+
+    class Context(abc.ABC):
+        """
+        Information available in an invocation of process_element(value, ctx, out) or
+        on_timer(value, ctx, out).
+        """
+
+        @abc.abstractmethod
+        def timestamp(self):
+            """
+            Timestamp of the element currently being processed or timestamp of a firing timer.
+
+            This might be null, for example if the time characteristic of your program is set to

Review comment:
       ```suggestion
               This might be None, for example if the time characteristic of your program is set to
   ```

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/StatefulPythonFunctionOperator.java
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamStatefulPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * {@link StatefulPythonFunctionOperator} is responsible for launching beam runner which will start
+ * a python harness to execute user defined python function. It is also able to handle the timer and
+ * state request from the python stateful user defined function.
+ * */
+public class StatefulPythonFunctionOperator<OUT> extends AbstractOneInputPythonFunctionOperator<Row, OUT>
+	implements ResultTypeQueryable<OUT>, Triggerable<Row, VoidNamespace> {
+
+	private static final Logger LOGGER = LoggerFactory.getLogger(StatefulPythonFunctionOperator.class);
+
+	protected static final String DATA_STREAM_STATEFUL_PYTHON_FUNCTION_URN =
+		"flink:transform:datastream_stateful_function:v1";
+
+	protected static final String DATA_STREAM_STATEFUL_MAP_FUNCTION_CODER_URN =
+		"flink:coder:datastream:stateful_map_function:v1";

Review comment:
       I'm confusing that where is the "map_function" comes from?

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/StatefulPythonFunctionOperator.java
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamStatefulPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * {@link StatefulPythonFunctionOperator} is responsible for launching beam runner which will start
+ * a python harness to execute user defined python function. It is also able to handle the timer and
+ * state request from the python stateful user defined function.
+ * */
+public class StatefulPythonFunctionOperator<OUT> extends AbstractOneInputPythonFunctionOperator<Row, OUT>
+	implements ResultTypeQueryable<OUT>, Triggerable<Row, VoidNamespace> {
+
+	private static final Logger LOGGER = LoggerFactory.getLogger(StatefulPythonFunctionOperator.class);
+
+	protected static final String DATA_STREAM_STATEFUL_PYTHON_FUNCTION_URN =

Review comment:
       private? Need also check the other fields.

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/StatefulPythonFunctionOperator.java
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamStatefulPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * {@link StatefulPythonFunctionOperator} is responsible for launching beam runner which will start
+ * a python harness to execute user defined python function. It is also able to handle the timer and
+ * state request from the python stateful user defined function.
+ * */
+public class StatefulPythonFunctionOperator<OUT> extends AbstractOneInputPythonFunctionOperator<Row, OUT>
+	implements ResultTypeQueryable<OUT>, Triggerable<Row, VoidNamespace> {
+

Review comment:
       Add `private static final long serialVersionUID = 1L;`

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/StatefulPythonFunctionOperator.java
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamStatefulPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * {@link StatefulPythonFunctionOperator} is responsible for launching beam runner which will start
+ * a python harness to execute user defined python function. It is also able to handle the timer and
+ * state request from the python stateful user defined function.
+ * */
+public class StatefulPythonFunctionOperator<OUT> extends AbstractOneInputPythonFunctionOperator<Row, OUT>
+	implements ResultTypeQueryable<OUT>, Triggerable<Row, VoidNamespace> {
+
+	private static final Logger LOGGER = LoggerFactory.getLogger(StatefulPythonFunctionOperator.class);
+
+	protected static final String DATA_STREAM_STATEFUL_PYTHON_FUNCTION_URN =
+		"flink:transform:datastream_stateful_function:v1";
+
+	protected static final String DATA_STREAM_STATEFUL_MAP_FUNCTION_CODER_URN =
+		"flink:coder:datastream:stateful_map_function:v1";
+
+	protected final DataStreamPythonFunctionInfo pythonFunctionInfo;
+	private final TypeInformation runnerInputTypeInfo;
+	private final TypeInformation runnerOutputTypeInfo;
+	private final TypeInformation<OUT> outputTypeInfo;
+	private final TypeInformation<Row> keyTypeInfo;
+	private TypeSerializer runnerInputSerializer;
+	private TypeSerializer runnerOutputSerializer;
+	private TypeSerializer keyTypeSerializer;
+
+	private transient TimerService timerservice;
+
+	protected final Map<String, String> jobOptions;
+
+	protected transient ByteArrayInputStreamWithPos bais;
+
+	protected transient DataInputViewStreamWrapper baisWrapper;
+
+	protected transient ByteArrayOutputStreamWithPos baos;
+
+	protected transient DataOutputViewStreamWrapper baosWrapper;
+
+	protected transient StreamRecordCollector streamRecordCollector;
+
+	protected Row reusableInput;
+	protected Row reusableTimerData;
+
+	public StatefulPythonFunctionOperator(
+		Configuration config,
+		RowTypeInfo inputTypeInfo,
+		TypeInformation<OUT> outputTypeInfo,
+		DataStreamPythonFunctionInfo pythonFunctionInfo) {
+		super(config);
+		this.jobOptions = config.toMap();
+		this.pythonFunctionInfo = pythonFunctionInfo;
+		this.outputTypeInfo = outputTypeInfo;
+		this.keyTypeInfo = new RowTypeInfo(inputTypeInfo.getTypeAt(0));
+		this.keyTypeSerializer = PythonTypeUtils.TypeInfoToSerializerConverter.typeInfoSerializerConverter(keyTypeInfo);
+		// inputType: normal data/ timer data, timerType: proc/event time, currentWatermark, keyData, real data
+		this.runnerInputTypeInfo = Types.ROW(Types.INT, Types.LONG, Types.LONG, this.keyTypeInfo, inputTypeInfo);
+		this.runnerOutputTypeInfo = Types.ROW(Types.INT, Types.LONG, this.keyTypeInfo, outputTypeInfo);
+	}
+
+	@Override
+	public void open() throws Exception {
+		super.open();
+		bais = new ByteArrayInputStreamWithPos();
+		baisWrapper = new DataInputViewStreamWrapper(bais);
+
+		baos = new ByteArrayOutputStreamWithPos();
+		baosWrapper = new DataOutputViewStreamWrapper(baos);
+		runnerInputSerializer = PythonTypeUtils.TypeInfoToSerializerConverter
+			.typeInfoSerializerConverter(runnerInputTypeInfo);
+		runnerOutputSerializer = PythonTypeUtils.TypeInfoToSerializerConverter
+			.typeInfoSerializerConverter(runnerOutputTypeInfo);
+
+		InternalTimerService<VoidNamespace> internalTimerService = getInternalTimerService("user-timers",
+			VoidNamespaceSerializer.INSTANCE, this);
+
+		this.streamRecordCollector = new StreamRecordCollector(output);
+		timerservice = new SimpleTimerService(internalTimerService);
+		reusableInput = new Row(5);
+		reusableTimerData = new Row(5);
+	}
+
+	@Override
+	public TypeInformation<OUT> getProducedType() {
+		return outputTypeInfo;
+	}
+
+	@Override
+	public void onEventTime(InternalTimer<Row, VoidNamespace> timer) throws Exception {
+		processTimer(false, timer);
+		checkInvokeFinishBundleByCount();
+	}
+
+	@Override
+	public void onProcessingTime(InternalTimer<Row, VoidNamespace> timer) throws Exception {
+		processTimer(true, timer);
+		checkInvokeFinishBundleByCount();

Review comment:
       Call emitResults(); after checkInvokeFinishBundleByCount()?

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/StatefulPythonFunctionOperator.java
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamStatefulPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * {@link StatefulPythonFunctionOperator} is responsible for launching beam runner which will start
+ * a python harness to execute user defined python function. It is also able to handle the timer and
+ * state request from the python stateful user defined function.
+ * */
+public class StatefulPythonFunctionOperator<OUT> extends AbstractOneInputPythonFunctionOperator<Row, OUT>
+	implements ResultTypeQueryable<OUT>, Triggerable<Row, VoidNamespace> {
+
+	private static final Logger LOGGER = LoggerFactory.getLogger(StatefulPythonFunctionOperator.class);
+
+	protected static final String DATA_STREAM_STATEFUL_PYTHON_FUNCTION_URN =
+		"flink:transform:datastream_stateful_function:v1";
+
+	protected static final String DATA_STREAM_STATEFUL_MAP_FUNCTION_CODER_URN =
+		"flink:coder:datastream:stateful_map_function:v1";
+
+	protected final DataStreamPythonFunctionInfo pythonFunctionInfo;
+	private final TypeInformation runnerInputTypeInfo;
+	private final TypeInformation runnerOutputTypeInfo;
+	private final TypeInformation<OUT> outputTypeInfo;
+	private final TypeInformation<Row> keyTypeInfo;
+	private TypeSerializer runnerInputSerializer;

Review comment:
       ```suggestion
   	private transient TypeSerializer runnerInputSerializer;
   ```

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamStatefulPythonFunctionRunner.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.runners.python.beam;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.python.metric.FlinkMetricContainer;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+/**
+ * {@link BeamDataStreamStatefulPythonFunctionRunner} is responsible for starting a beam python harness to execute user
+ * defined python function.
+ */
+public class BeamDataStreamStatefulPythonFunctionRunner extends BeamPythonFunctionRunner {
+	private static final long serialVersionUID = 1L;

Review comment:
       could be removed

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/StatefulPythonFunctionOperator.java
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamStatefulPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * {@link StatefulPythonFunctionOperator} is responsible for launching beam runner which will start
+ * a python harness to execute user defined python function. It is also able to handle the timer and
+ * state request from the python stateful user defined function.
+ * */
+public class StatefulPythonFunctionOperator<OUT> extends AbstractOneInputPythonFunctionOperator<Row, OUT>
+	implements ResultTypeQueryable<OUT>, Triggerable<Row, VoidNamespace> {
+
+	private static final Logger LOGGER = LoggerFactory.getLogger(StatefulPythonFunctionOperator.class);
+
+	protected static final String DATA_STREAM_STATEFUL_PYTHON_FUNCTION_URN =
+		"flink:transform:datastream_stateful_function:v1";
+
+	protected static final String DATA_STREAM_STATEFUL_MAP_FUNCTION_CODER_URN =
+		"flink:coder:datastream:stateful_map_function:v1";
+
+	protected final DataStreamPythonFunctionInfo pythonFunctionInfo;
+	private final TypeInformation runnerInputTypeInfo;
+	private final TypeInformation runnerOutputTypeInfo;
+	private final TypeInformation<OUT> outputTypeInfo;
+	private final TypeInformation<Row> keyTypeInfo;
+	private TypeSerializer runnerInputSerializer;
+	private TypeSerializer runnerOutputSerializer;
+	private TypeSerializer keyTypeSerializer;
+
+	private transient TimerService timerservice;
+
+	protected final Map<String, String> jobOptions;
+
+	protected transient ByteArrayInputStreamWithPos bais;
+
+	protected transient DataInputViewStreamWrapper baisWrapper;
+
+	protected transient ByteArrayOutputStreamWithPos baos;
+
+	protected transient DataOutputViewStreamWrapper baosWrapper;
+
+	protected transient StreamRecordCollector streamRecordCollector;
+
+	protected Row reusableInput;
+	protected Row reusableTimerData;
+
+	public StatefulPythonFunctionOperator(
+		Configuration config,
+		RowTypeInfo inputTypeInfo,
+		TypeInformation<OUT> outputTypeInfo,
+		DataStreamPythonFunctionInfo pythonFunctionInfo) {
+		super(config);
+		this.jobOptions = config.toMap();
+		this.pythonFunctionInfo = pythonFunctionInfo;
+		this.outputTypeInfo = outputTypeInfo;
+		this.keyTypeInfo = new RowTypeInfo(inputTypeInfo.getTypeAt(0));
+		this.keyTypeSerializer = PythonTypeUtils.TypeInfoToSerializerConverter.typeInfoSerializerConverter(keyTypeInfo);
+		// inputType: normal data/ timer data, timerType: proc/event time, currentWatermark, keyData, real data
+		this.runnerInputTypeInfo = Types.ROW(Types.INT, Types.LONG, Types.LONG, this.keyTypeInfo, inputTypeInfo);
+		this.runnerOutputTypeInfo = Types.ROW(Types.INT, Types.LONG, this.keyTypeInfo, outputTypeInfo);
+	}
+
+	@Override
+	public void open() throws Exception {
+		super.open();
+		bais = new ByteArrayInputStreamWithPos();
+		baisWrapper = new DataInputViewStreamWrapper(bais);
+
+		baos = new ByteArrayOutputStreamWithPos();
+		baosWrapper = new DataOutputViewStreamWrapper(baos);
+		runnerInputSerializer = PythonTypeUtils.TypeInfoToSerializerConverter
+			.typeInfoSerializerConverter(runnerInputTypeInfo);
+		runnerOutputSerializer = PythonTypeUtils.TypeInfoToSerializerConverter
+			.typeInfoSerializerConverter(runnerOutputTypeInfo);
+
+		InternalTimerService<VoidNamespace> internalTimerService = getInternalTimerService("user-timers",
+			VoidNamespaceSerializer.INSTANCE, this);
+
+		this.streamRecordCollector = new StreamRecordCollector(output);
+		timerservice = new SimpleTimerService(internalTimerService);
+		reusableInput = new Row(5);
+		reusableTimerData = new Row(5);
+	}
+
+	@Override
+	public TypeInformation<OUT> getProducedType() {
+		return outputTypeInfo;
+	}
+
+	@Override
+	public void onEventTime(InternalTimer<Row, VoidNamespace> timer) throws Exception {
+		processTimer(false, timer);
+		checkInvokeFinishBundleByCount();
+	}
+
+	@Override
+	public void onProcessingTime(InternalTimer<Row, VoidNamespace> timer) throws Exception {
+		processTimer(true, timer);
+		checkInvokeFinishBundleByCount();
+	}
+
+	@Override
+	public PythonFunctionRunner createPythonFunctionRunner() throws Exception {
+		return new BeamDataStreamStatefulPythonFunctionRunner(
+			getRuntimeContext().getTaskName(),
+			createPythonEnvironmentManager(),
+			runnerInputTypeInfo,
+			runnerOutputTypeInfo,
+			DATA_STREAM_STATEFUL_PYTHON_FUNCTION_URN,
+			PythonOperatorUtils.getUserDefinedDataStreamStatefulFunctionProto(
+				pythonFunctionInfo, getRuntimeContext(), Collections.EMPTY_MAP, keyTypeInfo),
+			DATA_STREAM_STATEFUL_MAP_FUNCTION_CODER_URN,
+			jobOptions,
+			getFlinkMetricContainer(),
+			getKeyedStateBackend(),
+			keyTypeSerializer,
+			getContainingTask().getEnvironment().getMemoryManager(),
+			getOperatorConfig().getManagedMemoryFractionOperatorUseCaseOfSlot(
+				ManagedMemoryUseCase.PYTHON,
+				getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration(),
+				getContainingTask().getEnvironment().getUserCodeClassLoader().asClassLoader()));
+	}
+
+	@Override
+	public PythonEnv getPythonEnv() {
+		return this.pythonFunctionInfo.getPythonFunction().getPythonEnv();
+	}
+
+	@Override
+	public void emitResult(Tuple2<byte[], Integer> resultTuple) throws Exception {
+		byte[] rawResult = resultTuple.f0;
+		int length = resultTuple.f1;
+		bais.setBuffer(rawResult, 0, length);
+		Row runnerOutput = (Row) runnerOutputSerializer.deserialize(baisWrapper);
+		if (runnerOutput.getField(0) != null) {
+			registerTimer(runnerOutput);
+		} else {
+			streamRecordCollector.collect(runnerOutput.getField(3));
+		}
+
+	}
+
+	@Override
+	public void processElement(StreamRecord<Row> element) throws Exception {
+		LOGGER.info("Current watermark: " + timerservice.currentWatermark());

Review comment:
       Remove this logging. It will output a log for each input element.

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/StatefulPythonFunctionOperator.java
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamStatefulPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * {@link StatefulPythonFunctionOperator} is responsible for launching beam runner which will start
+ * a python harness to execute user defined python function. It is also able to handle the timer and
+ * state request from the python stateful user defined function.
+ * */
+public class StatefulPythonFunctionOperator<OUT> extends AbstractOneInputPythonFunctionOperator<Row, OUT>
+	implements ResultTypeQueryable<OUT>, Triggerable<Row, VoidNamespace> {
+
+	private static final Logger LOGGER = LoggerFactory.getLogger(StatefulPythonFunctionOperator.class);
+
+	protected static final String DATA_STREAM_STATEFUL_PYTHON_FUNCTION_URN =
+		"flink:transform:datastream_stateful_function:v1";
+
+	protected static final String DATA_STREAM_STATEFUL_MAP_FUNCTION_CODER_URN =
+		"flink:coder:datastream:stateful_map_function:v1";
+
+	protected final DataStreamPythonFunctionInfo pythonFunctionInfo;
+	private final TypeInformation runnerInputTypeInfo;
+	private final TypeInformation runnerOutputTypeInfo;
+	private final TypeInformation<OUT> outputTypeInfo;
+	private final TypeInformation<Row> keyTypeInfo;
+	private TypeSerializer runnerInputSerializer;
+	private TypeSerializer runnerOutputSerializer;
+	private TypeSerializer keyTypeSerializer;
+
+	private transient TimerService timerservice;
+
+	protected final Map<String, String> jobOptions;
+
+	protected transient ByteArrayInputStreamWithPos bais;
+
+	protected transient DataInputViewStreamWrapper baisWrapper;
+
+	protected transient ByteArrayOutputStreamWithPos baos;
+
+	protected transient DataOutputViewStreamWrapper baosWrapper;
+
+	protected transient StreamRecordCollector streamRecordCollector;
+
+	protected Row reusableInput;
+	protected Row reusableTimerData;
+
+	public StatefulPythonFunctionOperator(
+		Configuration config,
+		RowTypeInfo inputTypeInfo,
+		TypeInformation<OUT> outputTypeInfo,
+		DataStreamPythonFunctionInfo pythonFunctionInfo) {
+		super(config);
+		this.jobOptions = config.toMap();
+		this.pythonFunctionInfo = pythonFunctionInfo;
+		this.outputTypeInfo = outputTypeInfo;
+		this.keyTypeInfo = new RowTypeInfo(inputTypeInfo.getTypeAt(0));
+		this.keyTypeSerializer = PythonTypeUtils.TypeInfoToSerializerConverter.typeInfoSerializerConverter(keyTypeInfo);

Review comment:
       Could move the initialization of the following fields to the open method, then we can make them transient.

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/StatefulPythonFunctionOperator.java
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamStatefulPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * {@link StatefulPythonFunctionOperator} is responsible for launching beam runner which will start
+ * a python harness to execute user defined python function. It is also able to handle the timer and
+ * state request from the python stateful user defined function.
+ * */
+public class StatefulPythonFunctionOperator<OUT> extends AbstractOneInputPythonFunctionOperator<Row, OUT>
+	implements ResultTypeQueryable<OUT>, Triggerable<Row, VoidNamespace> {
+
+	private static final Logger LOGGER = LoggerFactory.getLogger(StatefulPythonFunctionOperator.class);
+
+	protected static final String DATA_STREAM_STATEFUL_PYTHON_FUNCTION_URN =
+		"flink:transform:datastream_stateful_function:v1";
+
+	protected static final String DATA_STREAM_STATEFUL_MAP_FUNCTION_CODER_URN =
+		"flink:coder:datastream:stateful_map_function:v1";
+
+	protected final DataStreamPythonFunctionInfo pythonFunctionInfo;
+	private final TypeInformation runnerInputTypeInfo;
+	private final TypeInformation runnerOutputTypeInfo;
+	private final TypeInformation<OUT> outputTypeInfo;
+	private final TypeInformation<Row> keyTypeInfo;
+	private TypeSerializer runnerInputSerializer;
+	private TypeSerializer runnerOutputSerializer;

Review comment:
       ```suggestion
   	private transient TypeSerializer runnerOutputSerializer;
   ```

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamStatefulPythonFunctionRunner.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.runners.python.beam;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.python.metric.FlinkMetricContainer;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+/**
+ * {@link BeamDataStreamStatefulPythonFunctionRunner} is responsible for starting a beam python harness to execute user
+ * defined python function.
+ */
+public class BeamDataStreamStatefulPythonFunctionRunner extends BeamPythonFunctionRunner {

Review comment:
       It seems that the only difference between BeamDataStreamStatelessPythonFunctionRunner and BeamDataStreamStatefulPythonFunctionRunner is that the constructor of BeamDataStreamStatefulPythonFunctionRunner has stateBackend and keySerializer. What about unify them as BeamDataStreamPythonFunctionRunner?

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/StatefulPythonFunctionOperator.java
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamStatefulPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * {@link StatefulPythonFunctionOperator} is responsible for launching beam runner which will start
+ * a python harness to execute user defined python function. It is also able to handle the timer and
+ * state request from the python stateful user defined function.
+ * */
+public class StatefulPythonFunctionOperator<OUT> extends AbstractOneInputPythonFunctionOperator<Row, OUT>
+	implements ResultTypeQueryable<OUT>, Triggerable<Row, VoidNamespace> {
+
+	private static final Logger LOGGER = LoggerFactory.getLogger(StatefulPythonFunctionOperator.class);
+
+	protected static final String DATA_STREAM_STATEFUL_PYTHON_FUNCTION_URN =
+		"flink:transform:datastream_stateful_function:v1";
+
+	protected static final String DATA_STREAM_STATEFUL_MAP_FUNCTION_CODER_URN =
+		"flink:coder:datastream:stateful_map_function:v1";
+
+	protected final DataStreamPythonFunctionInfo pythonFunctionInfo;
+	private final TypeInformation runnerInputTypeInfo;
+	private final TypeInformation runnerOutputTypeInfo;
+	private final TypeInformation<OUT> outputTypeInfo;
+	private final TypeInformation<Row> keyTypeInfo;
+	private TypeSerializer runnerInputSerializer;
+	private TypeSerializer runnerOutputSerializer;
+	private TypeSerializer keyTypeSerializer;
+
+	private transient TimerService timerservice;
+
+	protected final Map<String, String> jobOptions;
+
+	protected transient ByteArrayInputStreamWithPos bais;
+
+	protected transient DataInputViewStreamWrapper baisWrapper;
+
+	protected transient ByteArrayOutputStreamWithPos baos;
+
+	protected transient DataOutputViewStreamWrapper baosWrapper;
+
+	protected transient StreamRecordCollector streamRecordCollector;
+
+	protected Row reusableInput;
+	protected Row reusableTimerData;

Review comment:
       ```suggestion
   	private transient Row reusableTimerData;
   ```

##########
File path: flink-python/pyflink/fn_execution/stateful_operation_common.py
##########
@@ -0,0 +1,72 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import time
+
+from typing import Any
+
+from pyflink.datastream.functions import Collector, TimerService
+
+
+class InternalCollector(Collector):
+    """
+    Internal implementation of the Collector. It usa a buffer list to store data to be emitted.

Review comment:
       ```suggestion
       Internal implementation of the Collector. It uses a buffer list to store data to be emitted.
   ```

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamStatefulPythonFunctionRunner.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.runners.python.beam;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.python.metric.FlinkMetricContainer;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+/**
+ * {@link BeamDataStreamStatefulPythonFunctionRunner} is responsible for starting a beam python harness to execute user
+ * defined python function.
+ */
+public class BeamDataStreamStatefulPythonFunctionRunner extends BeamPythonFunctionRunner {

Review comment:
       @Internal

##########
File path: flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/StatefulPythonFunctionOperator.java
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamStatefulPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * {@link StatefulPythonFunctionOperator} is responsible for launching beam runner which will start
+ * a python harness to execute user defined python function. It is also able to handle the timer and
+ * state request from the python stateful user defined function.
+ * */
+public class StatefulPythonFunctionOperator<OUT> extends AbstractOneInputPythonFunctionOperator<Row, OUT>
+	implements ResultTypeQueryable<OUT>, Triggerable<Row, VoidNamespace> {
+
+	private static final Logger LOGGER = LoggerFactory.getLogger(StatefulPythonFunctionOperator.class);
+
+	protected static final String DATA_STREAM_STATEFUL_PYTHON_FUNCTION_URN =
+		"flink:transform:datastream_stateful_function:v1";
+
+	protected static final String DATA_STREAM_STATEFUL_MAP_FUNCTION_CODER_URN =
+		"flink:coder:datastream:stateful_map_function:v1";
+
+	protected final DataStreamPythonFunctionInfo pythonFunctionInfo;
+	private final TypeInformation runnerInputTypeInfo;
+	private final TypeInformation runnerOutputTypeInfo;
+	private final TypeInformation<OUT> outputTypeInfo;
+	private final TypeInformation<Row> keyTypeInfo;
+	private TypeSerializer runnerInputSerializer;
+	private TypeSerializer runnerOutputSerializer;
+	private TypeSerializer keyTypeSerializer;
+
+	private transient TimerService timerservice;
+
+	protected final Map<String, String> jobOptions;
+
+	protected transient ByteArrayInputStreamWithPos bais;
+
+	protected transient DataInputViewStreamWrapper baisWrapper;
+
+	protected transient ByteArrayOutputStreamWithPos baos;
+
+	protected transient DataOutputViewStreamWrapper baosWrapper;
+
+	protected transient StreamRecordCollector streamRecordCollector;
+
+	protected Row reusableInput;
+	protected Row reusableTimerData;
+
+	public StatefulPythonFunctionOperator(
+		Configuration config,
+		RowTypeInfo inputTypeInfo,
+		TypeInformation<OUT> outputTypeInfo,
+		DataStreamPythonFunctionInfo pythonFunctionInfo) {
+		super(config);
+		this.jobOptions = config.toMap();
+		this.pythonFunctionInfo = pythonFunctionInfo;
+		this.outputTypeInfo = outputTypeInfo;
+		this.keyTypeInfo = new RowTypeInfo(inputTypeInfo.getTypeAt(0));
+		this.keyTypeSerializer = PythonTypeUtils.TypeInfoToSerializerConverter.typeInfoSerializerConverter(keyTypeInfo);
+		// inputType: normal data/ timer data, timerType: proc/event time, currentWatermark, keyData, real data
+		this.runnerInputTypeInfo = Types.ROW(Types.INT, Types.LONG, Types.LONG, this.keyTypeInfo, inputTypeInfo);
+		this.runnerOutputTypeInfo = Types.ROW(Types.INT, Types.LONG, this.keyTypeInfo, outputTypeInfo);
+	}
+
+	@Override
+	public void open() throws Exception {
+		super.open();
+		bais = new ByteArrayInputStreamWithPos();
+		baisWrapper = new DataInputViewStreamWrapper(bais);
+
+		baos = new ByteArrayOutputStreamWithPos();
+		baosWrapper = new DataOutputViewStreamWrapper(baos);
+		runnerInputSerializer = PythonTypeUtils.TypeInfoToSerializerConverter
+			.typeInfoSerializerConverter(runnerInputTypeInfo);
+		runnerOutputSerializer = PythonTypeUtils.TypeInfoToSerializerConverter
+			.typeInfoSerializerConverter(runnerOutputTypeInfo);
+
+		InternalTimerService<VoidNamespace> internalTimerService = getInternalTimerService("user-timers",
+			VoidNamespaceSerializer.INSTANCE, this);
+
+		this.streamRecordCollector = new StreamRecordCollector(output);
+		timerservice = new SimpleTimerService(internalTimerService);
+		reusableInput = new Row(5);
+		reusableTimerData = new Row(5);
+	}
+
+	@Override
+	public TypeInformation<OUT> getProducedType() {
+		return outputTypeInfo;
+	}
+
+	@Override
+	public void onEventTime(InternalTimer<Row, VoidNamespace> timer) throws Exception {
+		processTimer(false, timer);
+		checkInvokeFinishBundleByCount();

Review comment:
       Move checkInvokeFinishBundleByCount to processTimer?

##########
File path: flink-python/pyflink/fn_execution/stateful_operation_common.py
##########
@@ -0,0 +1,72 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import time
+
+from typing import Any
+
+from pyflink.datastream.functions import Collector, TimerService
+
+
+class InternalCollector(Collector):

Review comment:
       Move these two classes under class DataStreamStatefulFunctionOperation?

##########
File path: flink-python/pyflink/fn_execution/stateful_operation_common.py
##########
@@ -0,0 +1,72 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import time
+
+from typing import Any
+
+from pyflink.datastream.functions import Collector, TimerService
+
+
+class InternalCollector(Collector):
+    """
+    Internal implementation of the Collector. It usa a buffer list to store data to be emitted.
+    There will be a header flag for each data type. 0 means it is a proc time timer registering
+    request, while 1 means it is an event time timer and 2 means it is a normal data. When
+    registering a timer, it must take along the corresponding key for it.
+    """
+    def __init__(self):
+        self.buf = []
+
+    def collect_proc_timer(self, a: Any, key: Any):
+        self.buf.append((0, a, key, None))
+
+    def collect_event_timer(self, a: Any, key: Any):
+        self.buf.append((1, a, key, None))
+
+    def collect_data(self, a: Any):
+        self.buf.append((2, a))
+
+    def collect(self, a: Any):
+        self.collect_data(a)
+
+    def clear(self):
+        self.buf.clear()
+
+
+class InternalTimerService(TimerService):
+    """
+    Internal implementation of TimerService.
+    """
+    def __init__(self, collector, keyed_state_backend):
+        self._collector: InternalCollector = collector

Review comment:
       variable annotation is not supported for Python 3.5




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org