You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2020/09/21 11:05:09 UTC

[flink] 02/02: [FLINK-19301][python] Improve the package structure of Python DataStream API

This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f7ec18630ff9d6fd7a1b3fdb65304e95c34745e1
Author: Dian Fu <di...@apache.org>
AuthorDate: Fri Sep 18 21:41:42 2020 +0800

    [FLINK-19301][python] Improve the package structure of Python DataStream API
    
    This closes #13436
---
 flink-python/pyflink/common/typeinfo.py            |   4 +-
 flink-python/pyflink/datastream/data_stream.py     | 138 +++++++++------------
 flink-python/pyflink/java_gateway.py               |   3 +
 .../apache/flink/python/util/PythonConfigUtil.java |  20 +--
 .../functions/python/DataStreamPythonFunction.java |   7 +-
 .../python/DataStreamPythonFunctionInfo.java       |   4 +-
 .../python/PartitionCustomKeySelector.java         |  13 +-
 .../api}/functions/python/PickledKeySelector.java  |  12 +-
 .../python/PythonPartitionCustomOperator.java}     |  18 +--
 .../operators/python/PythonReduceOperator.java}    |  20 +--
 .../StatelessOneInputPythonFunctionOperator.java}  |  19 +--
 .../StatelessTwoInputPythonFunctionOperator.java}  |  19 +--
 ...amDataStreamStatelessPythonFunctionRunner.java} |  14 +--
 .../python/beam/BeamPythonFunctionRunner.java      |   2 +
 ...java => BeamStatelessPythonFunctionRunner.java} |   4 +-
 .../typeinfo/python/PickledByteArrayTypeInfo.java  |   4 +-
 .../api/typeutils}/PythonTypeUtils.java            |   6 +-
 .../AbstractPythonStatelessFunctionFlatMap.java    |   4 +-
 .../python/AbstractStatelessFunctionOperator.java  |   4 +-
 ...=> BeamTableStatelessPythonFunctionRunner.java} |   8 +-
 .../api/typeutils}/PythonTypeUtilsTest.java        |   5 +-
 .../PassThroughPythonAggregateFunctionRunner.java  |   4 +-
 .../PassThroughPythonScalarFunctionRunner.java     |   4 +-
 .../PassThroughPythonTableFunctionRunner.java      |   6 +-
 24 files changed, 172 insertions(+), 170 deletions(-)

diff --git a/flink-python/pyflink/common/typeinfo.py b/flink-python/pyflink/common/typeinfo.py
index 8b1da5d..8778822 100644
--- a/flink-python/pyflink/common/typeinfo.py
+++ b/flink-python/pyflink/common/typeinfo.py
@@ -217,7 +217,7 @@ class PickledBytesTypeInfo(TypeInformation, ABC):
 
     @staticmethod
     def PICKLED_BYTE_ARRAY_TYPE_INFO():
-        return WrapperTypeInfo(get_gateway().jvm.org.apache.flink.datastream.typeinfo.python
+        return WrapperTypeInfo(get_gateway().jvm.org.apache.flink.streaming.api.typeinfo.python
                                .PickledByteArrayTypeInfo.PICKLED_BYTE_ARRAY_TYPE_INFO)
 
 
@@ -441,7 +441,7 @@ def _from_java_type(j_type_info: JavaObject) -> TypeInformation:
         return Types.PRIMITIVE_ARRAY(Types.CHAR())
 
     JPickledBytesTypeInfo = gateway.jvm \
-        .org.apache.flink.datastream.typeinfo.python.PickledByteArrayTypeInfo\
+        .org.apache.flink.streaming.api.typeinfo.python.PickledByteArrayTypeInfo\
         .PICKLED_BYTE_ARRAY_TYPE_INFO
     if _is_instance_of(j_type_info, JPickledBytesTypeInfo):
         return Types.PICKLED_BYTE_ARRAY()
diff --git a/flink-python/pyflink/datastream/data_stream.py b/flink-python/pyflink/datastream/data_stream.py
index 3527be3..a1c5cb5 100644
--- a/flink-python/pyflink/datastream/data_stream.py
+++ b/flink-python/pyflink/datastream/data_stream.py
@@ -232,18 +232,15 @@ class DataStream(object):
                 func = MapFunctionWrapper(func)
             else:
                 raise TypeError("The input must be a MapFunction or a callable function")
-        func_name = str(func)
         from pyflink.fn_execution import flink_fn_execution_pb2
-        j_python_data_stream_scalar_function_operator, j_output_type_info = \
-            self._get_java_python_function_operator(func,
-                                                    output_type,
-                                                    func_name,
-                                                    flink_fn_execution_pb2
-                                                    .UserDefinedDataStreamFunction.MAP)
+        j_operator, j_output_type_info = self._get_java_python_function_operator(
+            func,
+            output_type,
+            flink_fn_execution_pb2.UserDefinedDataStreamFunction.MAP)
         return DataStream(self._j_data_stream.transform(
             "Map",
             j_output_type_info,
-            j_python_data_stream_scalar_function_operator
+            j_operator
         ))
 
     def flat_map(self, func: Union[Callable, FlatMapFunction],
@@ -263,18 +260,15 @@ class DataStream(object):
                 func = FlatMapFunctionWrapper(func)
             else:
                 raise TypeError("The input must be a FlatMapFunction or a callable function")
-        func_name = str(func)
         from pyflink.fn_execution import flink_fn_execution_pb2
-        j_python_data_stream_scalar_function_operator, j_output_type_info = \
-            self._get_java_python_function_operator(func,
-                                                    result_type,
-                                                    func_name,
-                                                    flink_fn_execution_pb2
-                                                    .UserDefinedDataStreamFunction.FLAT_MAP)
+        j_operator, j_output_type_info = self._get_java_python_function_operator(
+            func,
+            result_type,
+            flink_fn_execution_pb2.UserDefinedDataStreamFunction.FLAT_MAP)
         return DataStream(self._j_data_stream.transform(
             "FLAT_MAP",
             j_output_type_info,
-            j_python_data_stream_scalar_function_operator
+            j_operator
         ))
 
     def key_by(self, key_selector: Union[Callable, KeySelector],
@@ -292,10 +286,9 @@ class DataStream(object):
             raise TypeError("Parameter key_selector should be a type of KeySelector.")
 
         gateway = get_gateway()
-        PickledKeySelector = gateway.jvm \
-            .org.apache.flink.datastream.runtime.functions.python.PickledKeySelector
-        j_output_type_info = self._j_data_stream.getTransformation().getOutputType()
-        output_type_info = typeinfo._from_java_type(j_output_type_info)
+        PickledKeySelector = gateway.jvm.PickledKeySelector
+        output_type_info = typeinfo._from_java_type(
+            self._j_data_stream.getTransformation().getOutputType())
         is_key_pickled_byte_array = False
         if key_type_info is None:
             key_type_info = Types.PICKLED_BYTE_ARRAY()
@@ -305,11 +298,13 @@ class DataStream(object):
                                            output_type=Types.ROW([key_type_info, output_type_info]))
         intermediate_map_stream.name(gateway.jvm.org.apache.flink.python.util.PythonConfigUtil
                                      .STREAM_KEY_BY_MAP_OPERATOR_NAME)
-        generated_key_stream = KeyedStream(intermediate_map_stream._j_data_stream
-                                           .keyBy(PickledKeySelector(is_key_pickled_byte_array),
-                                                  key_type_info.get_java_type_info()), self)
-        generated_key_stream._original_data_type_info = output_type_info
-        return generated_key_stream
+        key_stream = KeyedStream(
+            intermediate_map_stream._j_data_stream.keyBy(
+                PickledKeySelector(is_key_pickled_byte_array),
+                key_type_info.get_java_type_info()),
+            self)
+        key_stream._original_data_type_info = output_type_info
+        return key_stream
 
     def filter(self, func: Union[Callable, FilterFunction]) -> 'DataStream':
         """
@@ -335,8 +330,8 @@ class DataStream(object):
         elif not isinstance(func, FilterFunction):
             raise TypeError("func must be a Callable or instance of FilterFunction.")
 
-        j_input_type = self._j_data_stream.getTransformation().getOutputType()
-        type_info = typeinfo._from_java_type(j_input_type)
+        type_info = typeinfo._from_java_type(
+            self._j_data_stream.getTransformation().getOutputType())
         j_data_stream = self.flat_map(FilterFlatMap(func), result_type=type_info)._j_data_stream
         filtered_stream = DataStream(j_data_stream)
         filtered_stream.name("Filter")
@@ -472,9 +467,8 @@ class DataStream(object):
             raise TypeError("Parameter partitioner should be a type of Partitioner.")
 
         gateway = get_gateway()
-        data_stream_num_partitions_env_key = gateway.jvm\
-            .org.apache.flink.datastream.runtime.operators.python\
-            .DataStreamPythonPartitionCustomFunctionOperator.DATA_STREAM_NUM_PARTITIONS
+        data_stream_num_partitions_env_key = \
+            gateway.jvm.PythonPartitionCustomOperator.DATA_STREAM_NUM_PARTITIONS
 
         class PartitionCustomMapFunction(MapFunction):
             """
@@ -505,10 +499,8 @@ class DataStream(object):
             gateway.jvm.org.apache.flink.python.util.PythonConfigUtil
             .STREAM_PARTITION_CUSTOM_MAP_OPERATOR_NAME)
 
-        JPartitionCustomKeySelector = gateway.jvm\
-            .org.apache.flink.datastream.runtime.functions.python.PartitionCustomKeySelector
-        JIdParitioner = gateway.jvm\
-            .org.apache.flink.api.java.functions.IdPartitioner
+        JPartitionCustomKeySelector = gateway.jvm.PartitionCustomKeySelector
+        JIdParitioner = gateway.jvm.org.apache.flink.api.java.functions.IdPartitioner
         intermediate_map_stream = DataStream(intermediate_map_stream._j_data_stream
                                              .partitionCustom(JIdParitioner(),
                                                               JPartitionCustomKeySelector()))
@@ -518,8 +510,9 @@ class DataStream(object):
                                .KEYED_STREAM_VALUE_OPERATOR_NAME)
         return DataStream(values_map_stream._j_data_stream)
 
-    def _get_java_python_function_operator(self, func: Union[Function, FunctionWrapper],
-                                           type_info: TypeInformation, func_name: str,
+    def _get_java_python_function_operator(self,
+                                           func: Union[Function, FunctionWrapper],
+                                           type_info: TypeInformation,
                                            func_type: int):
         """
         Create a flink operator according to user provided function object, data types,
@@ -547,19 +540,14 @@ class DataStream(object):
             else:
                 output_type_info = type_info
 
-        DataStreamPythonFunction = gateway.jvm.org.apache.flink.datastream.runtime.functions \
-            .python.DataStreamPythonFunction
-        j_python_data_stream_scalar_function = DataStreamPythonFunction(
-            func_name,
+        JDataStreamPythonFunction = gateway.jvm.DataStreamPythonFunction
+        j_data_stream_python_function = JDataStreamPythonFunction(
             bytearray(serialized_func),
             _get_python_env())
 
-        DataStreamPythonFunctionInfo = gateway.jvm. \
-            org.apache.flink.datastream.runtime.functions.python \
-            .DataStreamPythonFunctionInfo
-
-        j_python_data_stream_function_info = DataStreamPythonFunctionInfo(
-            j_python_data_stream_scalar_function,
+        JDataStreamPythonFunctionInfo = gateway.jvm.DataStreamPythonFunctionInfo
+        j_data_stream_python_function_info = JDataStreamPythonFunctionInfo(
+            j_data_stream_python_function,
             func_type)
 
         j_conf = gateway.jvm.org.apache.flink.configuration.Configuration()
@@ -568,31 +556,28 @@ class DataStream(object):
         from pyflink.fn_execution.flink_fn_execution_pb2 import UserDefinedDataStreamFunction
         if func_type == UserDefinedDataStreamFunction.REDUCE:
             j_conf.setInteger(gateway.jvm.org.apache.flink.python.PythonOptions.MAX_BUNDLE_SIZE, 1)
-            DataStreamPythonReduceFunctionOperator = gateway.jvm.org.apache.flink.datastream \
-                .runtime.operators.python.DataStreamPythonReduceFunctionOperator
 
             j_output_type_info = j_input_types.getTypeAt(1)
-            j_python_data_stream_function_operator = DataStreamPythonReduceFunctionOperator(
+            j_python_reduce_operator = gateway.jvm.PythonReduceOperator(
                 j_conf,
                 j_input_types,
                 j_output_type_info,
-                j_python_data_stream_function_info)
-            return j_python_data_stream_function_operator, j_output_type_info
+                j_data_stream_python_function_info)
+            return j_python_reduce_operator, j_output_type_info
         else:
             if str(func) == '_Flink_PartitionCustomMapFunction':
-                DataStreamPythonFunctionOperator = gateway.jvm.org.apache.flink.datastream.runtime \
-                    .operators.python.DataStreamPythonPartitionCustomFunctionOperator
+                JDataStreamPythonFunctionOperator = gateway.jvm.PythonPartitionCustomOperator
             else:
-                DataStreamPythonFunctionOperator = gateway.jvm.org.apache.flink.datastream.runtime \
-                    .operators.python.DataStreamPythonStatelessFunctionOperator
+                JDataStreamPythonFunctionOperator = \
+                    gateway.jvm.StatelessOneInputPythonFunctionOperator
 
-            j_python_data_stream_function_operator = DataStreamPythonFunctionOperator(
+            j_data_stream_python_function_operator = JDataStreamPythonFunctionOperator(
                 j_conf,
                 j_input_types,
                 output_type_info.get_java_type_info(),
-                j_python_data_stream_function_info)
+                j_data_stream_python_function_info)
 
-            return j_python_data_stream_function_operator, output_type_info.get_java_type_info()
+            return j_data_stream_python_function_operator, output_type_info.get_java_type_info()
 
     def add_sink(self, sink_func: SinkFunction) -> 'DataStreamSink':
         """
@@ -795,16 +780,14 @@ class KeyedStream(DataStream):
                 raise TypeError("The input must be a ReduceFunction or a callable function!")
 
         from pyflink.fn_execution.flink_fn_execution_pb2 import UserDefinedDataStreamFunction
-        func_name = "m_reduce_" + str(func)
-        j_python_data_stream_scalar_function_operator, j_output_type_info = \
+        j_operator, j_output_type_info = \
             self._get_java_python_function_operator(func,
                                                     None,
-                                                    func_name,
                                                     UserDefinedDataStreamFunction.REDUCE)
         return DataStream(self._j_data_stream.transform(
             "Keyed Reduce",
             j_output_type_info,
-            j_python_data_stream_scalar_function_operator
+            j_operator
         ))
 
     def filter(self, func: Union[Callable, FilterFunction]) -> 'DataStream':
@@ -947,7 +930,6 @@ class ConnectedStreams(object):
         """
         if not isinstance(func, CoMapFunction):
             raise TypeError("The input function must be a CoMapFunction!")
-        func_name = str(func)
 
         # get connected stream
         j_connected_stream = self.stream1._j_data_stream.connect(self.stream2._j_data_stream)
@@ -955,7 +937,6 @@ class ConnectedStreams(object):
         j_operator, j_output_type = self._get_connected_stream_operator(
             func,
             output_type,
-            func_name,
             flink_fn_execution_pb2.UserDefinedDataStreamFunction.CO_MAP)
         return DataStream(j_connected_stream.transform("Co-Map", j_output_type, j_operator))
 
@@ -974,7 +955,6 @@ class ConnectedStreams(object):
 
         if not isinstance(func, CoFlatMapFunction):
             raise TypeError("The input must be a CoFlatMapFunction!")
-        func_name = str(func)
 
         # get connected stream
         j_connected_stream = self.stream1._j_data_stream.connect(self.stream2._j_data_stream)
@@ -982,12 +962,12 @@ class ConnectedStreams(object):
         j_operator, j_output_type = self._get_connected_stream_operator(
             func,
             output_type,
-            func_name,
             flink_fn_execution_pb2.UserDefinedDataStreamFunction.CO_FLAT_MAP)
         return DataStream(j_connected_stream.transform("Co-Flat Map", j_output_type, j_operator))
 
-    def _get_connected_stream_operator(self, func: Union[Function, FunctionWrapper],
-                                       type_info: TypeInformation, func_name: str,
+    def _get_connected_stream_operator(self,
+                                       func: Union[Function, FunctionWrapper],
+                                       type_info: TypeInformation,
                                        func_type: int):
         gateway = get_gateway()
         import cloudpickle
@@ -1004,30 +984,26 @@ class ConnectedStreams(object):
             else:
                 output_type_info = type_info
 
-        DataStreamPythonFunction = gateway.jvm.org.apache.flink.datastream.runtime.functions \
-            .python.DataStreamPythonFunction
-        j_python_data_stream_scalar_function = DataStreamPythonFunction(
-            func_name,
+        DataStreamPythonFunction = gateway.jvm.DataStreamPythonFunction
+        j_data_stream_python_function = DataStreamPythonFunction(
             bytearray(serialized_func),
             _get_python_env())
 
-        DataStreamPythonFunctionInfo = gateway.jvm. \
-            org.apache.flink.datastream.runtime.functions.python \
-            .DataStreamPythonFunctionInfo
+        DataStreamPythonFunctionInfo = gateway.jvm.DataStreamPythonFunctionInfo
 
-        j_python_data_stream_function_info = DataStreamPythonFunctionInfo(
-            j_python_data_stream_scalar_function,
+        j_data_stream_python_function_info = DataStreamPythonFunctionInfo(
+            j_data_stream_python_function,
             func_type)
 
         j_conf = gateway.jvm.org.apache.flink.configuration.Configuration()
-        DataStreamPythonFunctionOperator = gateway.jvm.org.apache.flink.datastream.runtime \
-            .operators.python.DataStreamTwoInputPythonStatelessFunctionOperator
-        j_python_data_stream_function_operator = DataStreamPythonFunctionOperator(
+        StatelessTwoInputPythonFunctionOperator = \
+            gateway.jvm.StatelessTwoInputPythonFunctionOperator
+        j_python_data_stream_function_operator = StatelessTwoInputPythonFunctionOperator(
             j_conf,
             j_input_types1,
             j_input_types2,
             output_type_info.get_java_type_info(),
-            j_python_data_stream_function_info,
+            j_data_stream_python_function_info,
             self._is_keyed_stream())
 
         return j_python_data_stream_function_operator, output_type_info.get_java_type_info()
diff --git a/flink-python/pyflink/java_gateway.py b/flink-python/pyflink/java_gateway.py
index dc4a693..ba7bbf4 100644
--- a/flink-python/pyflink/java_gateway.py
+++ b/flink-python/pyflink/java_gateway.py
@@ -153,6 +153,9 @@ def import_flink_view(gateway):
     java_import(gateway.jvm, "org.apache.flink.python.util.PythonDependencyUtils")
     java_import(gateway.jvm, "org.apache.flink.python.PythonOptions")
     java_import(gateway.jvm, "org.apache.flink.client.python.PythonGatewayServer")
+    java_import(gateway.jvm, "org.apache.flink.streaming.api.functions.python.*")
+    java_import(gateway.jvm, "org.apache.flink.streaming.api.operators.python.*")
+    java_import(gateway.jvm, "org.apache.flink.streaming.api.typeinfo.python.*")
 
 
 class PythonFunctionFactory(object):
diff --git a/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java b/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
index 19c6b4d..f028ed2 100644
--- a/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
+++ b/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
@@ -18,9 +18,6 @@
 package org.apache.flink.python.util;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.datastream.runtime.operators.python.DataStreamPythonPartitionCustomFunctionOperator;
-import org.apache.flink.datastream.runtime.operators.python.DataStreamPythonStatelessFunctionOperator;
-import org.apache.flink.datastream.runtime.operators.python.DataStreamTwoInputPythonStatelessFunctionOperator;
 import org.apache.flink.python.PythonConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.graph.StreamEdge;
@@ -30,6 +27,9 @@ import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator;
+import org.apache.flink.streaming.api.operators.python.PythonPartitionCustomOperator;
+import org.apache.flink.streaming.api.operators.python.StatelessOneInputPythonFunctionOperator;
+import org.apache.flink.streaming.api.operators.python.StatelessTwoInputPythonFunctionOperator;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
 
 import java.lang.reflect.InvocationTargetException;
@@ -84,7 +84,7 @@ public class PythonConfigUtil {
 	}
 
 	/**
-	 * Configure the {@link DataStreamPythonStatelessFunctionOperator} to be chained with the upstream/downstream
+	 * Configure the {@link StatelessOneInputPythonFunctionOperator} to be chained with the upstream/downstream
 	 * operator by setting their parallelism, slot sharing group, co-location group to be the same, and applying a
 	 * {@link ForwardPartitioner}.
 	 * 1. operator with name "_keyed_stream_values_operator" should align with its downstream operator.
@@ -115,7 +115,7 @@ public class PythonConfigUtil {
 
 	/**
 	 * Generate a {@link StreamGraph} for transformations maintained by current {@link StreamExecutionEnvironment}, and
-	 * reset the merged env configurations with dependencies to every {@link DataStreamPythonStatelessFunctionOperator}.
+	 * reset the merged env configurations with dependencies to every {@link StatelessOneInputPythonFunctionOperator}.
 	 * It is an idempotent operation that can be call multiple times. Remember that only when need to execute the
 	 * StreamGraph can we set the clearTransformations to be True.
 	 */
@@ -133,8 +133,8 @@ public class PythonConfigUtil {
 			StreamOperatorFactory streamOperatorFactory = streamNode.getOperatorFactory();
 			if (streamOperatorFactory instanceof SimpleOperatorFactory) {
 				StreamOperator streamOperator = ((SimpleOperatorFactory) streamOperatorFactory).getOperator();
-				if ((streamOperator instanceof DataStreamPythonStatelessFunctionOperator) ||
-					(streamOperator instanceof DataStreamTwoInputPythonStatelessFunctionOperator)) {
+				if ((streamOperator instanceof StatelessOneInputPythonFunctionOperator) ||
+					(streamOperator instanceof StatelessTwoInputPythonFunctionOperator)) {
 					AbstractPythonFunctionOperator abstractPythonFunctionOperator =
 						(AbstractPythonFunctionOperator) streamOperator;
 
@@ -157,9 +157,9 @@ public class PythonConfigUtil {
 			StreamOperatorFactory streamOperatorFactory = streamNode.getOperatorFactory();
 			if (streamOperatorFactory instanceof SimpleOperatorFactory) {
 				StreamOperator streamOperator = ((SimpleOperatorFactory) streamOperatorFactory).getOperator();
-				if (streamOperator instanceof DataStreamPythonPartitionCustomFunctionOperator) {
-					DataStreamPythonPartitionCustomFunctionOperator paritionCustomFunctionOperator =
-						(DataStreamPythonPartitionCustomFunctionOperator) streamOperator;
+				if (streamOperator instanceof PythonPartitionCustomOperator) {
+					PythonPartitionCustomOperator paritionCustomFunctionOperator =
+						(PythonPartitionCustomOperator) streamOperator;
 
 					// Update the numPartitions of PartitionCustomOperator after aligned all operators.
 					paritionCustomFunctionOperator.setNumPartitions(
diff --git a/flink-python/src/main/java/org/apache/flink/datastream/runtime/functions/python/DataStreamPythonFunction.java b/flink-python/src/main/java/org/apache/flink/streaming/api/functions/python/DataStreamPythonFunction.java
similarity index 91%
rename from flink-python/src/main/java/org/apache/flink/datastream/runtime/functions/python/DataStreamPythonFunction.java
rename to flink-python/src/main/java/org/apache/flink/streaming/api/functions/python/DataStreamPythonFunction.java
index ce7a606..4ae6685 100644
--- a/flink-python/src/main/java/org/apache/flink/datastream/runtime/functions/python/DataStreamPythonFunction.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/functions/python/DataStreamPythonFunction.java
@@ -16,22 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.flink.datastream.runtime.functions.python;
+package org.apache.flink.streaming.api.functions.python;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.functions.python.PythonEnv;
 import org.apache.flink.table.functions.python.PythonFunction;
 
 /**
  * {@link DataStreamPythonFunction} maintains the serialized python function and its function type, which will be used in
- * BeamDataStreamPythonStatelessFunctionRunner.
+ * BeamDataStreamStatelessPythonFunctionRunner.
  */
+@Internal
 public class DataStreamPythonFunction implements PythonFunction {
 	private static final long serialVersionUID = 1L;
 	private final byte[] serializedPythonFunction;
 	private final PythonEnv pythonEnv;
 
 	public DataStreamPythonFunction(
-		String funcName,
 		byte[] serializedPythonFunction,
 		PythonEnv pythonEnv) {
 		this.serializedPythonFunction = serializedPythonFunction;
diff --git a/flink-python/src/main/java/org/apache/flink/datastream/runtime/functions/python/DataStreamPythonFunctionInfo.java b/flink-python/src/main/java/org/apache/flink/streaming/api/functions/python/DataStreamPythonFunctionInfo.java
similarity index 93%
rename from flink-python/src/main/java/org/apache/flink/datastream/runtime/functions/python/DataStreamPythonFunctionInfo.java
rename to flink-python/src/main/java/org/apache/flink/streaming/api/functions/python/DataStreamPythonFunctionInfo.java
index ce6bc9e..c7fea86 100644
--- a/flink-python/src/main/java/org/apache/flink/datastream/runtime/functions/python/DataStreamPythonFunctionInfo.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/functions/python/DataStreamPythonFunctionInfo.java
@@ -15,8 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.datastream.runtime.functions.python;
+package org.apache.flink.streaming.api.functions.python;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.functions.python.PythonFunction;
 
 import java.io.Serializable;
@@ -24,6 +25,7 @@ import java.io.Serializable;
 /**
  * {@link DataStreamPythonFunctionInfo} holds a PythonFunction and its function type.
  * */
+@Internal
 public class DataStreamPythonFunctionInfo implements Serializable {
 	private static final long serialVersionUID = 1L;
 
diff --git a/flink-python/src/main/java/org/apache/flink/datastream/runtime/functions/python/PartitionCustomKeySelector.java b/flink-python/src/main/java/org/apache/flink/streaming/api/functions/python/PartitionCustomKeySelector.java
similarity index 78%
rename from flink-python/src/main/java/org/apache/flink/datastream/runtime/functions/python/PartitionCustomKeySelector.java
rename to flink-python/src/main/java/org/apache/flink/streaming/api/functions/python/PartitionCustomKeySelector.java
index 04391b3..995bb28 100644
--- a/flink-python/src/main/java/org/apache/flink/datastream/runtime/functions/python/PartitionCustomKeySelector.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/functions/python/PartitionCustomKeySelector.java
@@ -16,19 +16,24 @@
  * limitations under the License.
  */
 
-package org.apache.flink.datastream.runtime.functions.python;
+package org.apache.flink.streaming.api.functions.python;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.operators.python.PythonPartitionCustomOperator;
 import org.apache.flink.types.Row;
 
 /**
- * The {@link PartitionCustomKeySelector} will return the first field of the input row value. The input value is
- * generated by the
- * {@link org.apache.flink.datastream.runtime.operators.python.DataStreamPythonPartitionCustomFunctionOperator} after
+ * The {@link PartitionCustomKeySelector} will return the first field of the input row value.
+ * The input value is generated by the {@link PythonPartitionCustomOperator} after
  * executed user defined partitioner and keySelector function. The value of the first field will be the desired
  * partition index.
  */
+@Internal
 public class PartitionCustomKeySelector implements KeySelector<Row, Integer> {
+
+	private static final long serialVersionUID = 1L;
+
 	@Override
 	public Integer getKey(Row value) throws Exception {
 		return (Integer) value.getField(0);
diff --git a/flink-python/src/main/java/org/apache/flink/datastream/runtime/functions/python/PickledKeySelector.java b/flink-python/src/main/java/org/apache/flink/streaming/api/functions/python/PickledKeySelector.java
similarity index 84%
rename from flink-python/src/main/java/org/apache/flink/datastream/runtime/functions/python/PickledKeySelector.java
rename to flink-python/src/main/java/org/apache/flink/streaming/api/functions/python/PickledKeySelector.java
index 1bc3699..91cff04 100644
--- a/flink-python/src/main/java/org/apache/flink/datastream/runtime/functions/python/PickledKeySelector.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/functions/python/PickledKeySelector.java
@@ -15,22 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.flink.datastream.runtime.functions.python;
+package org.apache.flink.streaming.api.functions.python;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.types.Row;
 
 import net.razorvine.pickle.Unpickler;
 
 /**
- * {@link PickledKeySelector} is responsible for extracting the first filed of the input row as key.
+ * {@link PickledKeySelector} is responsible for extracting the first field of the input row as key.
  * The input row is generated by python DataStream map function in the format of (key_selector.get_key(value), value)
  * tuple2.
  */
+@Internal
 public class PickledKeySelector implements KeySelector<Row, Object> {
 
-	private Unpickler unpickler = null;
-	private boolean isPickledByteArray = false;
+	private static final long serialVersionUID = 1L;
+
+	private final boolean isPickledByteArray;
+	private transient Unpickler unpickler = null;
 
 	public PickledKeySelector(boolean isPickledByteArray) {
 		this.isPickledByteArray = isPickledByteArray;
diff --git a/flink-python/src/main/java/org/apache/flink/datastream/runtime/operators/python/DataStreamPythonPartitionCustomFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonPartitionCustomOperator.java
similarity index 81%
rename from flink-python/src/main/java/org/apache/flink/datastream/runtime/operators/python/DataStreamPythonPartitionCustomFunctionOperator.java
rename to flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonPartitionCustomOperator.java
index 2a39935..d7802a2 100644
--- a/flink-python/src/main/java/org/apache/flink/datastream/runtime/operators/python/DataStreamPythonPartitionCustomFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonPartitionCustomOperator.java
@@ -16,31 +16,33 @@
  * limitations under the License.
  */
 
-package org.apache.flink.datastream.runtime.operators.python;
+package org.apache.flink.streaming.api.operators.python;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.datastream.runtime.functions.python.DataStreamPythonFunctionInfo;
-import org.apache.flink.datastream.runtime.runners.python.beam.BeamDataStreamPythonStatelessFunctionRunner;
 import org.apache.flink.python.PythonFunctionRunner;
 import org.apache.flink.python.env.PythonEnvironmentManager;
 import org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamStatelessPythonFunctionRunner;
 
 /**
- * The {@link DataStreamPythonPartitionCustomFunctionOperator} enables us to set the number of partitions for current
+ * The {@link PythonPartitionCustomOperator} enables us to set the number of partitions for current
  * operator dynamically when generating the {@link org.apache.flink.streaming.api.graph.StreamGraph} before executing
  * the job. The number of partitions will be set in environment variables for python Worker, so that we can obtain the
  * number of partitions when executing user defined partitioner function.
  */
-public class DataStreamPythonPartitionCustomFunctionOperator<IN, OUT> extends
-	DataStreamPythonStatelessFunctionOperator<IN, OUT> {
+@Internal
+public class PythonPartitionCustomOperator<IN, OUT> extends
+	StatelessOneInputPythonFunctionOperator<IN, OUT> {
 
 	public static final String DATA_STREAM_NUM_PARTITIONS = "DATA_STREAM_NUM_PARTITIONS";
 
 	private int numPartitions = CoreOptions.DEFAULT_PARALLELISM.defaultValue();
 
-	public DataStreamPythonPartitionCustomFunctionOperator(
+	public PythonPartitionCustomOperator(
 		Configuration config,
 		TypeInformation<IN> inputTypeInfo,
 		TypeInformation<OUT> outputTypeInfo,
@@ -56,7 +58,7 @@ public class DataStreamPythonPartitionCustomFunctionOperator<IN, OUT> extends
 			envManager.setEnvironmentVariable(DATA_STREAM_NUM_PARTITIONS,
 				String.valueOf(this.numPartitions));
 		}
-		return new BeamDataStreamPythonStatelessFunctionRunner(
+		return new BeamDataStreamStatelessPythonFunctionRunner(
 			getRuntimeContext().getTaskName(),
 			pythonEnvironmentManager,
 			inputTypeInfo,
diff --git a/flink-python/src/main/java/org/apache/flink/datastream/runtime/operators/python/DataStreamPythonReduceFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonReduceOperator.java
similarity index 85%
rename from flink-python/src/main/java/org/apache/flink/datastream/runtime/operators/python/DataStreamPythonReduceFunctionOperator.java
rename to flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonReduceOperator.java
index b73da63..9f2ba13 100644
--- a/flink-python/src/main/java/org/apache/flink/datastream/runtime/operators/python/DataStreamPythonReduceFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonReduceOperator.java
@@ -16,8 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.datastream.runtime.operators.python;
+package org.apache.flink.streaming.api.operators.python;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -25,19 +26,20 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.datastream.runtime.functions.python.DataStreamPythonFunctionInfo;
-import org.apache.flink.datastream.runtime.runners.python.beam.BeamDataStreamPythonStatelessFunctionRunner;
-import org.apache.flink.datastream.runtime.typeutils.python.PythonTypeUtils;
 import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamStatelessPythonFunctionRunner;
+import org.apache.flink.streaming.api.typeutils.PythonTypeUtils;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.types.Row;
 
 /**
- * {@link DataStreamPythonReduceFunctionOperator} is responsible for launching beam runner which
+ * {@link PythonReduceOperator} is responsible for launching beam runner which
  * will start a python harness to execute user defined python ReduceFunction.
  */
-public class DataStreamPythonReduceFunctionOperator<OUT>
-	extends DataStreamPythonStatelessFunctionOperator<Row, OUT> {
+@Internal
+public class PythonReduceOperator<OUT>
+	extends StatelessOneInputPythonFunctionOperator<Row, OUT> {
 
 	private static final long serialVersionUID = 1L;
 
@@ -51,7 +53,7 @@ public class DataStreamPythonReduceFunctionOperator<OUT>
 
 	private transient Row reuseRow;
 
-	public DataStreamPythonReduceFunctionOperator(
+	public PythonReduceOperator(
 		Configuration config,
 		TypeInformation<Row> inputTypeInfo,
 		TypeInformation<OUT> outputTypeInfo,
@@ -108,7 +110,7 @@ public class DataStreamPythonReduceFunctionOperator<OUT>
 
 	@Override
 	public PythonFunctionRunner createPythonFunctionRunner() throws Exception {
-		return new BeamDataStreamPythonStatelessFunctionRunner(
+		return new BeamDataStreamStatelessPythonFunctionRunner(
 			getRuntimeContext().getTaskName(),
 			createPythonEnvironmentManager(),
 			runnerInputTypeInfo,
diff --git a/flink-python/src/main/java/org/apache/flink/datastream/runtime/operators/python/DataStreamPythonStatelessFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/StatelessOneInputPythonFunctionOperator.java
similarity index 89%
rename from flink-python/src/main/java/org/apache/flink/datastream/runtime/operators/python/DataStreamPythonStatelessFunctionOperator.java
rename to flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/StatelessOneInputPythonFunctionOperator.java
index 84d43f2..c41cfac 100644
--- a/flink-python/src/main/java/org/apache/flink/datastream/runtime/operators/python/DataStreamPythonStatelessFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/StatelessOneInputPythonFunctionOperator.java
@@ -15,8 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.datastream.runtime.operators.python;
+package org.apache.flink.streaming.api.operators.python;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -25,12 +26,11 @@ import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.datastream.runtime.functions.python.DataStreamPythonFunctionInfo;
-import org.apache.flink.datastream.runtime.runners.python.beam.BeamDataStreamPythonStatelessFunctionRunner;
-import org.apache.flink.datastream.runtime.typeutils.python.PythonTypeUtils;
 import org.apache.flink.fnexecution.v1.FlinkFnApi;
 import org.apache.flink.python.PythonFunctionRunner;
-import org.apache.flink.streaming.api.operators.python.AbstractOneInputPythonFunctionOperator;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamStatelessPythonFunctionRunner;
+import org.apache.flink.streaming.api.typeutils.PythonTypeUtils;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.table.functions.python.PythonEnv;
 import org.apache.flink.table.runtime.util.StreamRecordCollector;
@@ -40,10 +40,11 @@ import com.google.protobuf.ByteString;
 import java.util.Map;
 
 /**
- * {@link DataStreamPythonStatelessFunctionOperator} is responsible for launching beam runner which will start a python
+ * {@link StatelessOneInputPythonFunctionOperator} is responsible for launching beam runner which will start a python
  * harness to execute user defined python function.
  */
-public class DataStreamPythonStatelessFunctionOperator<IN, OUT>
+@Internal
+public class StatelessOneInputPythonFunctionOperator<IN, OUT>
 	extends AbstractOneInputPythonFunctionOperator<IN, OUT> {
 
 	private static final long serialVersionUID = 1L;
@@ -76,7 +77,7 @@ public class DataStreamPythonStatelessFunctionOperator<IN, OUT>
 
 	protected transient StreamRecordCollector streamRecordCollector;
 
-	public DataStreamPythonStatelessFunctionOperator(
+	public StatelessOneInputPythonFunctionOperator(
 		Configuration config,
 		TypeInformation<IN> inputTypeInfo,
 		TypeInformation<OUT> outputTypeInfo,
@@ -116,7 +117,7 @@ public class DataStreamPythonStatelessFunctionOperator<IN, OUT>
 			coderUrn = DATA_STREAM_FLAT_MAP_FUNCTION_CODER_URN;
 		}
 
-		return new BeamDataStreamPythonStatelessFunctionRunner(
+		return new BeamDataStreamStatelessPythonFunctionRunner(
 			getRuntimeContext().getTaskName(),
 			createPythonEnvironmentManager(),
 			inputTypeInfo,
diff --git a/flink-python/src/main/java/org/apache/flink/datastream/runtime/operators/python/DataStreamTwoInputPythonStatelessFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/StatelessTwoInputPythonFunctionOperator.java
similarity index 92%
rename from flink-python/src/main/java/org/apache/flink/datastream/runtime/operators/python/DataStreamTwoInputPythonStatelessFunctionOperator.java
rename to flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/StatelessTwoInputPythonFunctionOperator.java
index e35f070..8882e1d 100644
--- a/flink-python/src/main/java/org/apache/flink/datastream/runtime/operators/python/DataStreamTwoInputPythonStatelessFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/StatelessTwoInputPythonFunctionOperator.java
@@ -15,8 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.datastream.runtime.operators.python;
+package org.apache.flink.streaming.api.operators.python;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -27,12 +28,11 @@ import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.datastream.runtime.functions.python.DataStreamPythonFunctionInfo;
-import org.apache.flink.datastream.runtime.runners.python.beam.BeamDataStreamPythonStatelessFunctionRunner;
-import org.apache.flink.datastream.runtime.typeutils.python.PythonTypeUtils;
 import org.apache.flink.fnexecution.v1.FlinkFnApi;
 import org.apache.flink.python.PythonFunctionRunner;
-import org.apache.flink.streaming.api.operators.python.AbstractTwoInputPythonFunctionOperator;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamStatelessPythonFunctionRunner;
+import org.apache.flink.streaming.api.typeutils.PythonTypeUtils;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.table.functions.python.PythonEnv;
 import org.apache.flink.table.runtime.util.StreamRecordCollector;
@@ -43,10 +43,11 @@ import com.google.protobuf.ByteString;
 import java.util.Map;
 
 /**
- * {@link DataStreamTwoInputPythonStatelessFunctionOperator} is responsible for launching beam
+ * {@link StatelessTwoInputPythonFunctionOperator} is responsible for launching beam
  * runner which will start a python harness to execute two-input user defined python function.
  */
-public class DataStreamTwoInputPythonStatelessFunctionOperator<IN1, IN2, OUT>
+@Internal
+public class StatelessTwoInputPythonFunctionOperator<IN1, IN2, OUT>
 	extends AbstractTwoInputPythonFunctionOperator<IN1, IN2, OUT> {
 
 	private static final long serialVersionUID = 1L;
@@ -86,7 +87,7 @@ public class DataStreamTwoInputPythonStatelessFunctionOperator<IN1, IN2, OUT>
 	private final TypeInformation<IN1> inputTypeInfo1;
 	private final TypeInformation<IN2> inputTypeInfo2;
 
-	public DataStreamTwoInputPythonStatelessFunctionOperator(
+	public StatelessTwoInputPythonFunctionOperator(
 		Configuration config,
 		TypeInformation<IN1> inputTypeInfo1,
 		TypeInformation<IN2> inputTypeInfo2,
@@ -148,7 +149,7 @@ public class DataStreamTwoInputPythonStatelessFunctionOperator<IN1, IN2, OUT>
 			throw new RuntimeException("Function Type for ConnectedStream should be Map or FlatMap");
 		}
 
-		return new BeamDataStreamPythonStatelessFunctionRunner(
+		return new BeamDataStreamStatelessPythonFunctionRunner(
 			getRuntimeContext().getTaskName(),
 			createPythonEnvironmentManager(),
 			runnerInputTypeInfo,
diff --git a/flink-python/src/main/java/org/apache/flink/datastream/runtime/runners/python/beam/BeamDataStreamPythonStatelessFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamStatelessPythonFunctionRunner.java
similarity index 85%
rename from flink-python/src/main/java/org/apache/flink/datastream/runtime/runners/python/beam/BeamDataStreamPythonStatelessFunctionRunner.java
rename to flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamStatelessPythonFunctionRunner.java
index 488d8fa..359af18 100644
--- a/flink-python/src/main/java/org/apache/flink/datastream/runtime/runners/python/beam/BeamDataStreamPythonStatelessFunctionRunner.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamStatelessPythonFunctionRunner.java
@@ -16,14 +16,14 @@
  * limitations under the License.
  */
 
-package org.apache.flink.datastream.runtime.runners.python.beam;
+package org.apache.flink.streaming.api.runners.python.beam;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.datastream.runtime.typeutils.python.PythonTypeUtils;
 import org.apache.flink.fnexecution.v1.FlinkFnApi;
 import org.apache.flink.python.env.PythonEnvironmentManager;
 import org.apache.flink.python.metric.FlinkMetricContainer;
-import org.apache.flink.streaming.api.runners.python.beam.BeamPythonStatelessFunctionRunner;
+import org.apache.flink.streaming.api.typeutils.PythonTypeUtils;
 
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 
@@ -32,18 +32,18 @@ import javax.annotation.Nullable;
 import java.util.Map;
 
 /**
- * {@link BeamDataStreamPythonStatelessFunctionRunner} is responsible for starting a beam python harness to execute user
+ * {@link BeamDataStreamStatelessPythonFunctionRunner} is responsible for starting a beam python harness to execute user
  * defined python function.
  */
-public class BeamDataStreamPythonStatelessFunctionRunner extends BeamPythonStatelessFunctionRunner {
-	private static final long serialVersionUID = 1L;
+@Internal
+public class BeamDataStreamStatelessPythonFunctionRunner extends BeamStatelessPythonFunctionRunner {
 
 	private final TypeInformation inputType;
 	private final TypeInformation outputTupe;
 	private final FlinkFnApi.UserDefinedDataStreamFunctions userDefinedDataStreamFunctions;
 	private final String coderUrn;
 
-	public BeamDataStreamPythonStatelessFunctionRunner(
+	public BeamDataStreamStatelessPythonFunctionRunner(
 		String taskName,
 		PythonEnvironmentManager environmentManager,
 		TypeInformation inputType,
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
index 2516147..27537d6 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.api.runners.python.beam;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.python.PythonConfig;
 import org.apache.flink.python.PythonFunctionRunner;
@@ -57,6 +58,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 /**
  * An base class for {@link PythonFunctionRunner} based on beam.
  */
+@Internal
 public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner {
 	protected static final Logger LOG = LoggerFactory.getLogger(BeamPythonFunctionRunner.class);
 
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonStatelessFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamStatelessPythonFunctionRunner.java
similarity index 98%
rename from flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonStatelessFunctionRunner.java
rename to flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamStatelessPythonFunctionRunner.java
index 91acbc8..a999d0e 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonStatelessFunctionRunner.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamStatelessPythonFunctionRunner.java
@@ -52,7 +52,7 @@ import static org.apache.beam.runners.core.construction.BeamUrns.getUrn;
  * A {@link BeamPythonFunctionRunner} used to execute Python stateless functions.
  */
 @Internal
-public abstract class BeamPythonStatelessFunctionRunner extends BeamPythonFunctionRunner {
+public abstract class BeamStatelessPythonFunctionRunner extends BeamPythonFunctionRunner {
 	private static final String INPUT_ID = "input";
 	private static final String OUTPUT_ID = "output";
 	private static final String TRANSFORM_ID = "transform";
@@ -68,7 +68,7 @@ public abstract class BeamPythonStatelessFunctionRunner extends BeamPythonFuncti
 
 	private final String functionUrn;
 
-	public BeamPythonStatelessFunctionRunner(
+	public BeamStatelessPythonFunctionRunner(
 		String taskName,
 		PythonEnvironmentManager environmentManager,
 		String functionUrn,
diff --git a/flink-python/src/main/java/org/apache/flink/datastream/typeinfo/python/PickledByteArrayTypeInfo.java b/flink-python/src/main/java/org/apache/flink/streaming/api/typeinfo/python/PickledByteArrayTypeInfo.java
similarity index 95%
rename from flink-python/src/main/java/org/apache/flink/datastream/typeinfo/python/PickledByteArrayTypeInfo.java
rename to flink-python/src/main/java/org/apache/flink/streaming/api/typeinfo/python/PickledByteArrayTypeInfo.java
index 4c9a893..55bcc73 100644
--- a/flink-python/src/main/java/org/apache/flink/datastream/typeinfo/python/PickledByteArrayTypeInfo.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/typeinfo/python/PickledByteArrayTypeInfo.java
@@ -15,8 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.datastream.typeinfo.python;
+package org.apache.flink.streaming.api.typeinfo.python;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -26,6 +27,7 @@ import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerial
  * A PickledByteArrayTypeInfo indicates that the data of this type is a generated primitive byte
  * array by pickle.
  */
+@Internal
 public class PickledByteArrayTypeInfo extends TypeInformation<byte[]> {
 
 	public static final PickledByteArrayTypeInfo PICKLED_BYTE_ARRAY_TYPE_INFO = new PickledByteArrayTypeInfo();
diff --git a/flink-python/src/main/java/org/apache/flink/datastream/runtime/typeutils/python/PythonTypeUtils.java b/flink-python/src/main/java/org/apache/flink/streaming/api/typeutils/PythonTypeUtils.java
similarity index 98%
rename from flink-python/src/main/java/org/apache/flink/datastream/runtime/typeutils/python/PythonTypeUtils.java
rename to flink-python/src/main/java/org/apache/flink/streaming/api/typeutils/PythonTypeUtils.java
index 8927f1b..c526ab1 100644
--- a/flink-python/src/main/java/org/apache/flink/datastream/runtime/typeutils/python/PythonTypeUtils.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/typeutils/PythonTypeUtils.java
@@ -16,8 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.datastream.runtime.typeutils.python;
+package org.apache.flink.streaming.api.typeutils;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -44,8 +45,8 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.runtime.RowSerializer;
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
-import org.apache.flink.datastream.typeinfo.python.PickledByteArrayTypeInfo;
 import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.streaming.api.typeinfo.python.PickledByteArrayTypeInfo;
 import org.apache.flink.table.runtime.typeutils.serializers.python.BigDecSerializer;
 import org.apache.flink.table.runtime.typeutils.serializers.python.StringSerializer;
 
@@ -56,6 +57,7 @@ import java.util.Map;
 /**
  * A util class for converting the given TypeInformation to other objects.
  */
+@Internal
 public class PythonTypeUtils {
 
 	/**
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/AbstractPythonStatelessFunctionFlatMap.java b/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/AbstractPythonStatelessFunctionFlatMap.java
index 12c1bcc..e891acb 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/AbstractPythonStatelessFunctionFlatMap.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/AbstractPythonStatelessFunctionFlatMap.java
@@ -40,7 +40,7 @@ import org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager;
 import org.apache.flink.python.metric.FlinkMetricContainer;
 import org.apache.flink.table.functions.python.PythonEnv;
 import org.apache.flink.table.functions.python.PythonFunctionInfo;
-import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonStatelessFunctionRunner;
+import org.apache.flink.table.runtime.runners.python.beam.BeamTableStatelessPythonFunctionRunner;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter;
 import org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter;
@@ -329,7 +329,7 @@ public abstract class AbstractPythonStatelessFunctionFlatMap
 	}
 
 	private PythonFunctionRunner createPythonFunctionRunner() throws IOException {
-		return new BeamTablePythonStatelessFunctionRunner(
+		return new BeamTableStatelessPythonFunctionRunner(
 			getRuntimeContext().getTaskName(),
 			createPythonEnvironmentManager(),
 			userDefinedFunctionInputType,
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java
index eb46961..35e543c 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java
@@ -30,7 +30,7 @@ import org.apache.flink.streaming.api.operators.python.AbstractOneInputPythonFun
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.functions.python.PythonFunctionInfo;
-import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonStatelessFunctionRunner;
+import org.apache.flink.table.runtime.runners.python.beam.BeamTableStatelessPythonFunctionRunner;
 import org.apache.flink.table.runtime.types.CRow;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.Row;
@@ -152,7 +152,7 @@ public abstract class AbstractStatelessFunctionOperator<IN, OUT, UDFIN>
 
 	@Override
 	public PythonFunctionRunner createPythonFunctionRunner() throws IOException {
-		return new BeamTablePythonStatelessFunctionRunner(
+		return new BeamTableStatelessPythonFunctionRunner(
 			getRuntimeContext().getTaskName(),
 			createPythonEnvironmentManager(),
 			userDefinedFunctionInputType,
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonStatelessFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTableStatelessPythonFunctionRunner.java
similarity index 91%
rename from flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonStatelessFunctionRunner.java
rename to flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTableStatelessPythonFunctionRunner.java
index 91ff931..0714161 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonStatelessFunctionRunner.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTableStatelessPythonFunctionRunner.java
@@ -22,7 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.fnexecution.v1.FlinkFnApi;
 import org.apache.flink.python.env.PythonEnvironmentManager;
 import org.apache.flink.python.metric.FlinkMetricContainer;
-import org.apache.flink.streaming.api.runners.python.beam.BeamPythonStatelessFunctionRunner;
+import org.apache.flink.streaming.api.runners.python.beam.BeamStatelessPythonFunctionRunner;
 import org.apache.flink.table.runtime.typeutils.PythonTypeUtils;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
@@ -33,17 +33,17 @@ import org.apache.beam.model.pipeline.v1.RunnerApi;
 import java.util.Map;
 
 /**
- * A {@link BeamTablePythonStatelessFunctionRunner} used to execute Python stateless functions.
+ * A {@link BeamTableStatelessPythonFunctionRunner} used to execute Python stateless functions.
  */
 @Internal
-public class BeamTablePythonStatelessFunctionRunner extends BeamPythonStatelessFunctionRunner {
+public class BeamTableStatelessPythonFunctionRunner extends BeamStatelessPythonFunctionRunner {
 
 	private final RowType inputType;
 	private final RowType outputType;
 	private final String coderUrn;
 	private final FlinkFnApi.UserDefinedFunctions userDefinedFunctions;
 
-	public BeamTablePythonStatelessFunctionRunner(
+	public BeamTableStatelessPythonFunctionRunner(
 		String taskName,
 		PythonEnvironmentManager environmentManager,
 		RowType inputType,
diff --git a/flink-python/src/test/java/org/apache/flink/python/util/PythonTypeUtilsTest.java b/flink-python/src/test/java/org/apache/flink/streaming/api/typeutils/PythonTypeUtilsTest.java
similarity index 97%
rename from flink-python/src/test/java/org/apache/flink/python/util/PythonTypeUtilsTest.java
rename to flink-python/src/test/java/org/apache/flink/streaming/api/typeutils/PythonTypeUtilsTest.java
index b37549c..3b1a2a2 100644
--- a/flink-python/src/test/java/org/apache/flink/python/util/PythonTypeUtilsTest.java
+++ b/flink-python/src/test/java/org/apache/flink/streaming/api/typeutils/PythonTypeUtilsTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.python.util;
+package org.apache.flink.streaming.api.typeutils;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
@@ -36,9 +36,8 @@ import org.apache.flink.api.common.typeutils.base.array.IntPrimitiveArraySeriali
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.runtime.RowSerializer;
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
-import org.apache.flink.datastream.runtime.typeutils.python.PythonTypeUtils;
-import org.apache.flink.datastream.typeinfo.python.PickledByteArrayTypeInfo;
 import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.streaming.api.typeinfo.python.PickledByteArrayTypeInfo;
 import org.apache.flink.table.runtime.typeutils.serializers.python.BigDecSerializer;
 import org.apache.flink.table.runtime.typeutils.serializers.python.StringSerializer;
 
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.java
index 2f474b9..e4cffa5 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.java
@@ -25,7 +25,7 @@ import org.apache.flink.python.PythonConfig;
 import org.apache.flink.python.env.PythonEnvironmentManager;
 import org.apache.flink.python.metric.FlinkMetricContainer;
 import org.apache.flink.table.runtime.arrow.serializers.RowDataArrowSerializer;
-import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonStatelessFunctionRunner;
+import org.apache.flink.table.runtime.runners.python.beam.BeamTableStatelessPythonFunctionRunner;
 import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
@@ -39,7 +39,7 @@ import java.util.Map;
  * A {@link PassThroughPythonAggregateFunctionRunner} runner that just return the first input element
  * with the same key as the execution results.
  */
-public class PassThroughPythonAggregateFunctionRunner extends BeamTablePythonStatelessFunctionRunner {
+public class PassThroughPythonAggregateFunctionRunner extends BeamTableStatelessPythonFunctionRunner {
 
 	private final List<byte[]> buffer;
 
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java
index 57dc439..0964717 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.runtime.utils;
 import org.apache.flink.fnexecution.v1.FlinkFnApi;
 import org.apache.flink.python.env.PythonEnvironmentManager;
 import org.apache.flink.python.metric.FlinkMetricContainer;
-import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonStatelessFunctionRunner;
+import org.apache.flink.table.runtime.runners.python.beam.BeamTableStatelessPythonFunctionRunner;
 import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
@@ -34,7 +34,7 @@ import java.util.Map;
 /**
  * A {@link PassThroughPythonScalarFunctionRunner} runner that just return the input elements as the execution results.
  */
-public class PassThroughPythonScalarFunctionRunner extends BeamTablePythonStatelessFunctionRunner {
+public class PassThroughPythonScalarFunctionRunner extends BeamTableStatelessPythonFunctionRunner {
 
 	private final List<byte[]> buffer;
 
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java
index 2fc4264..e3fecdd 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.runtime.utils;
 import org.apache.flink.fnexecution.v1.FlinkFnApi;
 import org.apache.flink.python.env.PythonEnvironmentManager;
 import org.apache.flink.python.metric.FlinkMetricContainer;
-import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonStatelessFunctionRunner;
+import org.apache.flink.table.runtime.runners.python.beam.BeamTableStatelessPythonFunctionRunner;
 import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
@@ -32,10 +32,10 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * A {@link BeamTablePythonStatelessFunctionRunner} that emit each input element in inner join and emit null in
+ * A {@link BeamTableStatelessPythonFunctionRunner} that emit each input element in inner join and emit null in
  * left join when certain test conditions are met.
  */
-public class PassThroughPythonTableFunctionRunner extends BeamTablePythonStatelessFunctionRunner {
+public class PassThroughPythonTableFunctionRunner extends BeamTableStatelessPythonFunctionRunner {
 
 	private int num = 0;