You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2020/09/21 11:05:09 UTC
[flink] 02/02: [FLINK-19301][python] Improve the package structure
of Python DataStream API
This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit f7ec18630ff9d6fd7a1b3fdb65304e95c34745e1
Author: Dian Fu <di...@apache.org>
AuthorDate: Fri Sep 18 21:41:42 2020 +0800
[FLINK-19301][python] Improve the package structure of Python DataStream API
This closes #13436
---
flink-python/pyflink/common/typeinfo.py | 4 +-
flink-python/pyflink/datastream/data_stream.py | 138 +++++++++------------
flink-python/pyflink/java_gateway.py | 3 +
.../apache/flink/python/util/PythonConfigUtil.java | 20 +--
.../functions/python/DataStreamPythonFunction.java | 7 +-
.../python/DataStreamPythonFunctionInfo.java | 4 +-
.../python/PartitionCustomKeySelector.java | 13 +-
.../api}/functions/python/PickledKeySelector.java | 12 +-
.../python/PythonPartitionCustomOperator.java} | 18 +--
.../operators/python/PythonReduceOperator.java} | 20 +--
.../StatelessOneInputPythonFunctionOperator.java} | 19 +--
.../StatelessTwoInputPythonFunctionOperator.java} | 19 +--
...amDataStreamStatelessPythonFunctionRunner.java} | 14 +--
.../python/beam/BeamPythonFunctionRunner.java | 2 +
...java => BeamStatelessPythonFunctionRunner.java} | 4 +-
.../typeinfo/python/PickledByteArrayTypeInfo.java | 4 +-
.../api/typeutils}/PythonTypeUtils.java | 6 +-
.../AbstractPythonStatelessFunctionFlatMap.java | 4 +-
.../python/AbstractStatelessFunctionOperator.java | 4 +-
...=> BeamTableStatelessPythonFunctionRunner.java} | 8 +-
.../api/typeutils}/PythonTypeUtilsTest.java | 5 +-
.../PassThroughPythonAggregateFunctionRunner.java | 4 +-
.../PassThroughPythonScalarFunctionRunner.java | 4 +-
.../PassThroughPythonTableFunctionRunner.java | 6 +-
24 files changed, 172 insertions(+), 170 deletions(-)
diff --git a/flink-python/pyflink/common/typeinfo.py b/flink-python/pyflink/common/typeinfo.py
index 8b1da5d..8778822 100644
--- a/flink-python/pyflink/common/typeinfo.py
+++ b/flink-python/pyflink/common/typeinfo.py
@@ -217,7 +217,7 @@ class PickledBytesTypeInfo(TypeInformation, ABC):
@staticmethod
def PICKLED_BYTE_ARRAY_TYPE_INFO():
- return WrapperTypeInfo(get_gateway().jvm.org.apache.flink.datastream.typeinfo.python
+ return WrapperTypeInfo(get_gateway().jvm.org.apache.flink.streaming.api.typeinfo.python
.PickledByteArrayTypeInfo.PICKLED_BYTE_ARRAY_TYPE_INFO)
@@ -441,7 +441,7 @@ def _from_java_type(j_type_info: JavaObject) -> TypeInformation:
return Types.PRIMITIVE_ARRAY(Types.CHAR())
JPickledBytesTypeInfo = gateway.jvm \
- .org.apache.flink.datastream.typeinfo.python.PickledByteArrayTypeInfo\
+ .org.apache.flink.streaming.api.typeinfo.python.PickledByteArrayTypeInfo\
.PICKLED_BYTE_ARRAY_TYPE_INFO
if _is_instance_of(j_type_info, JPickledBytesTypeInfo):
return Types.PICKLED_BYTE_ARRAY()
diff --git a/flink-python/pyflink/datastream/data_stream.py b/flink-python/pyflink/datastream/data_stream.py
index 3527be3..a1c5cb5 100644
--- a/flink-python/pyflink/datastream/data_stream.py
+++ b/flink-python/pyflink/datastream/data_stream.py
@@ -232,18 +232,15 @@ class DataStream(object):
func = MapFunctionWrapper(func)
else:
raise TypeError("The input must be a MapFunction or a callable function")
- func_name = str(func)
from pyflink.fn_execution import flink_fn_execution_pb2
- j_python_data_stream_scalar_function_operator, j_output_type_info = \
- self._get_java_python_function_operator(func,
- output_type,
- func_name,
- flink_fn_execution_pb2
- .UserDefinedDataStreamFunction.MAP)
+ j_operator, j_output_type_info = self._get_java_python_function_operator(
+ func,
+ output_type,
+ flink_fn_execution_pb2.UserDefinedDataStreamFunction.MAP)
return DataStream(self._j_data_stream.transform(
"Map",
j_output_type_info,
- j_python_data_stream_scalar_function_operator
+ j_operator
))
def flat_map(self, func: Union[Callable, FlatMapFunction],
@@ -263,18 +260,15 @@ class DataStream(object):
func = FlatMapFunctionWrapper(func)
else:
raise TypeError("The input must be a FlatMapFunction or a callable function")
- func_name = str(func)
from pyflink.fn_execution import flink_fn_execution_pb2
- j_python_data_stream_scalar_function_operator, j_output_type_info = \
- self._get_java_python_function_operator(func,
- result_type,
- func_name,
- flink_fn_execution_pb2
- .UserDefinedDataStreamFunction.FLAT_MAP)
+ j_operator, j_output_type_info = self._get_java_python_function_operator(
+ func,
+ result_type,
+ flink_fn_execution_pb2.UserDefinedDataStreamFunction.FLAT_MAP)
return DataStream(self._j_data_stream.transform(
"FLAT_MAP",
j_output_type_info,
- j_python_data_stream_scalar_function_operator
+ j_operator
))
def key_by(self, key_selector: Union[Callable, KeySelector],
@@ -292,10 +286,9 @@ class DataStream(object):
raise TypeError("Parameter key_selector should be a type of KeySelector.")
gateway = get_gateway()
- PickledKeySelector = gateway.jvm \
- .org.apache.flink.datastream.runtime.functions.python.PickledKeySelector
- j_output_type_info = self._j_data_stream.getTransformation().getOutputType()
- output_type_info = typeinfo._from_java_type(j_output_type_info)
+ PickledKeySelector = gateway.jvm.PickledKeySelector
+ output_type_info = typeinfo._from_java_type(
+ self._j_data_stream.getTransformation().getOutputType())
is_key_pickled_byte_array = False
if key_type_info is None:
key_type_info = Types.PICKLED_BYTE_ARRAY()
@@ -305,11 +298,13 @@ class DataStream(object):
output_type=Types.ROW([key_type_info, output_type_info]))
intermediate_map_stream.name(gateway.jvm.org.apache.flink.python.util.PythonConfigUtil
.STREAM_KEY_BY_MAP_OPERATOR_NAME)
- generated_key_stream = KeyedStream(intermediate_map_stream._j_data_stream
- .keyBy(PickledKeySelector(is_key_pickled_byte_array),
- key_type_info.get_java_type_info()), self)
- generated_key_stream._original_data_type_info = output_type_info
- return generated_key_stream
+ key_stream = KeyedStream(
+ intermediate_map_stream._j_data_stream.keyBy(
+ PickledKeySelector(is_key_pickled_byte_array),
+ key_type_info.get_java_type_info()),
+ self)
+ key_stream._original_data_type_info = output_type_info
+ return key_stream
def filter(self, func: Union[Callable, FilterFunction]) -> 'DataStream':
"""
@@ -335,8 +330,8 @@ class DataStream(object):
elif not isinstance(func, FilterFunction):
raise TypeError("func must be a Callable or instance of FilterFunction.")
- j_input_type = self._j_data_stream.getTransformation().getOutputType()
- type_info = typeinfo._from_java_type(j_input_type)
+ type_info = typeinfo._from_java_type(
+ self._j_data_stream.getTransformation().getOutputType())
j_data_stream = self.flat_map(FilterFlatMap(func), result_type=type_info)._j_data_stream
filtered_stream = DataStream(j_data_stream)
filtered_stream.name("Filter")
@@ -472,9 +467,8 @@ class DataStream(object):
raise TypeError("Parameter partitioner should be a type of Partitioner.")
gateway = get_gateway()
- data_stream_num_partitions_env_key = gateway.jvm\
- .org.apache.flink.datastream.runtime.operators.python\
- .DataStreamPythonPartitionCustomFunctionOperator.DATA_STREAM_NUM_PARTITIONS
+ data_stream_num_partitions_env_key = \
+ gateway.jvm.PythonPartitionCustomOperator.DATA_STREAM_NUM_PARTITIONS
class PartitionCustomMapFunction(MapFunction):
"""
@@ -505,10 +499,8 @@ class DataStream(object):
gateway.jvm.org.apache.flink.python.util.PythonConfigUtil
.STREAM_PARTITION_CUSTOM_MAP_OPERATOR_NAME)
- JPartitionCustomKeySelector = gateway.jvm\
- .org.apache.flink.datastream.runtime.functions.python.PartitionCustomKeySelector
- JIdParitioner = gateway.jvm\
- .org.apache.flink.api.java.functions.IdPartitioner
+ JPartitionCustomKeySelector = gateway.jvm.PartitionCustomKeySelector
+ JIdParitioner = gateway.jvm.org.apache.flink.api.java.functions.IdPartitioner
intermediate_map_stream = DataStream(intermediate_map_stream._j_data_stream
.partitionCustom(JIdParitioner(),
JPartitionCustomKeySelector()))
@@ -518,8 +510,9 @@ class DataStream(object):
.KEYED_STREAM_VALUE_OPERATOR_NAME)
return DataStream(values_map_stream._j_data_stream)
- def _get_java_python_function_operator(self, func: Union[Function, FunctionWrapper],
- type_info: TypeInformation, func_name: str,
+ def _get_java_python_function_operator(self,
+ func: Union[Function, FunctionWrapper],
+ type_info: TypeInformation,
func_type: int):
"""
Create a flink operator according to user provided function object, data types,
@@ -547,19 +540,14 @@ class DataStream(object):
else:
output_type_info = type_info
- DataStreamPythonFunction = gateway.jvm.org.apache.flink.datastream.runtime.functions \
- .python.DataStreamPythonFunction
- j_python_data_stream_scalar_function = DataStreamPythonFunction(
- func_name,
+ JDataStreamPythonFunction = gateway.jvm.DataStreamPythonFunction
+ j_data_stream_python_function = JDataStreamPythonFunction(
bytearray(serialized_func),
_get_python_env())
- DataStreamPythonFunctionInfo = gateway.jvm. \
- org.apache.flink.datastream.runtime.functions.python \
- .DataStreamPythonFunctionInfo
-
- j_python_data_stream_function_info = DataStreamPythonFunctionInfo(
- j_python_data_stream_scalar_function,
+ JDataStreamPythonFunctionInfo = gateway.jvm.DataStreamPythonFunctionInfo
+ j_data_stream_python_function_info = JDataStreamPythonFunctionInfo(
+ j_data_stream_python_function,
func_type)
j_conf = gateway.jvm.org.apache.flink.configuration.Configuration()
@@ -568,31 +556,28 @@ class DataStream(object):
from pyflink.fn_execution.flink_fn_execution_pb2 import UserDefinedDataStreamFunction
if func_type == UserDefinedDataStreamFunction.REDUCE:
j_conf.setInteger(gateway.jvm.org.apache.flink.python.PythonOptions.MAX_BUNDLE_SIZE, 1)
- DataStreamPythonReduceFunctionOperator = gateway.jvm.org.apache.flink.datastream \
- .runtime.operators.python.DataStreamPythonReduceFunctionOperator
j_output_type_info = j_input_types.getTypeAt(1)
- j_python_data_stream_function_operator = DataStreamPythonReduceFunctionOperator(
+ j_python_reduce_operator = gateway.jvm.PythonReduceOperator(
j_conf,
j_input_types,
j_output_type_info,
- j_python_data_stream_function_info)
- return j_python_data_stream_function_operator, j_output_type_info
+ j_data_stream_python_function_info)
+ return j_python_reduce_operator, j_output_type_info
else:
if str(func) == '_Flink_PartitionCustomMapFunction':
- DataStreamPythonFunctionOperator = gateway.jvm.org.apache.flink.datastream.runtime \
- .operators.python.DataStreamPythonPartitionCustomFunctionOperator
+ JDataStreamPythonFunctionOperator = gateway.jvm.PythonPartitionCustomOperator
else:
- DataStreamPythonFunctionOperator = gateway.jvm.org.apache.flink.datastream.runtime \
- .operators.python.DataStreamPythonStatelessFunctionOperator
+ JDataStreamPythonFunctionOperator = \
+ gateway.jvm.StatelessOneInputPythonFunctionOperator
- j_python_data_stream_function_operator = DataStreamPythonFunctionOperator(
+ j_data_stream_python_function_operator = JDataStreamPythonFunctionOperator(
j_conf,
j_input_types,
output_type_info.get_java_type_info(),
- j_python_data_stream_function_info)
+ j_data_stream_python_function_info)
- return j_python_data_stream_function_operator, output_type_info.get_java_type_info()
+ return j_data_stream_python_function_operator, output_type_info.get_java_type_info()
def add_sink(self, sink_func: SinkFunction) -> 'DataStreamSink':
"""
@@ -795,16 +780,14 @@ class KeyedStream(DataStream):
raise TypeError("The input must be a ReduceFunction or a callable function!")
from pyflink.fn_execution.flink_fn_execution_pb2 import UserDefinedDataStreamFunction
- func_name = "m_reduce_" + str(func)
- j_python_data_stream_scalar_function_operator, j_output_type_info = \
+ j_operator, j_output_type_info = \
self._get_java_python_function_operator(func,
None,
- func_name,
UserDefinedDataStreamFunction.REDUCE)
return DataStream(self._j_data_stream.transform(
"Keyed Reduce",
j_output_type_info,
- j_python_data_stream_scalar_function_operator
+ j_operator
))
def filter(self, func: Union[Callable, FilterFunction]) -> 'DataStream':
@@ -947,7 +930,6 @@ class ConnectedStreams(object):
"""
if not isinstance(func, CoMapFunction):
raise TypeError("The input function must be a CoMapFunction!")
- func_name = str(func)
# get connected stream
j_connected_stream = self.stream1._j_data_stream.connect(self.stream2._j_data_stream)
@@ -955,7 +937,6 @@ class ConnectedStreams(object):
j_operator, j_output_type = self._get_connected_stream_operator(
func,
output_type,
- func_name,
flink_fn_execution_pb2.UserDefinedDataStreamFunction.CO_MAP)
return DataStream(j_connected_stream.transform("Co-Map", j_output_type, j_operator))
@@ -974,7 +955,6 @@ class ConnectedStreams(object):
if not isinstance(func, CoFlatMapFunction):
raise TypeError("The input must be a CoFlatMapFunction!")
- func_name = str(func)
# get connected stream
j_connected_stream = self.stream1._j_data_stream.connect(self.stream2._j_data_stream)
@@ -982,12 +962,12 @@ class ConnectedStreams(object):
j_operator, j_output_type = self._get_connected_stream_operator(
func,
output_type,
- func_name,
flink_fn_execution_pb2.UserDefinedDataStreamFunction.CO_FLAT_MAP)
return DataStream(j_connected_stream.transform("Co-Flat Map", j_output_type, j_operator))
- def _get_connected_stream_operator(self, func: Union[Function, FunctionWrapper],
- type_info: TypeInformation, func_name: str,
+ def _get_connected_stream_operator(self,
+ func: Union[Function, FunctionWrapper],
+ type_info: TypeInformation,
func_type: int):
gateway = get_gateway()
import cloudpickle
@@ -1004,30 +984,26 @@ class ConnectedStreams(object):
else:
output_type_info = type_info
- DataStreamPythonFunction = gateway.jvm.org.apache.flink.datastream.runtime.functions \
- .python.DataStreamPythonFunction
- j_python_data_stream_scalar_function = DataStreamPythonFunction(
- func_name,
+ DataStreamPythonFunction = gateway.jvm.DataStreamPythonFunction
+ j_data_stream_python_function = DataStreamPythonFunction(
bytearray(serialized_func),
_get_python_env())
- DataStreamPythonFunctionInfo = gateway.jvm. \
- org.apache.flink.datastream.runtime.functions.python \
- .DataStreamPythonFunctionInfo
+ DataStreamPythonFunctionInfo = gateway.jvm.DataStreamPythonFunctionInfo
- j_python_data_stream_function_info = DataStreamPythonFunctionInfo(
- j_python_data_stream_scalar_function,
+ j_data_stream_python_function_info = DataStreamPythonFunctionInfo(
+ j_data_stream_python_function,
func_type)
j_conf = gateway.jvm.org.apache.flink.configuration.Configuration()
- DataStreamPythonFunctionOperator = gateway.jvm.org.apache.flink.datastream.runtime \
- .operators.python.DataStreamTwoInputPythonStatelessFunctionOperator
- j_python_data_stream_function_operator = DataStreamPythonFunctionOperator(
+ StatelessTwoInputPythonFunctionOperator = \
+ gateway.jvm.StatelessTwoInputPythonFunctionOperator
+ j_python_data_stream_function_operator = StatelessTwoInputPythonFunctionOperator(
j_conf,
j_input_types1,
j_input_types2,
output_type_info.get_java_type_info(),
- j_python_data_stream_function_info,
+ j_data_stream_python_function_info,
self._is_keyed_stream())
return j_python_data_stream_function_operator, output_type_info.get_java_type_info()
diff --git a/flink-python/pyflink/java_gateway.py b/flink-python/pyflink/java_gateway.py
index dc4a693..ba7bbf4 100644
--- a/flink-python/pyflink/java_gateway.py
+++ b/flink-python/pyflink/java_gateway.py
@@ -153,6 +153,9 @@ def import_flink_view(gateway):
java_import(gateway.jvm, "org.apache.flink.python.util.PythonDependencyUtils")
java_import(gateway.jvm, "org.apache.flink.python.PythonOptions")
java_import(gateway.jvm, "org.apache.flink.client.python.PythonGatewayServer")
+ java_import(gateway.jvm, "org.apache.flink.streaming.api.functions.python.*")
+ java_import(gateway.jvm, "org.apache.flink.streaming.api.operators.python.*")
+ java_import(gateway.jvm, "org.apache.flink.streaming.api.typeinfo.python.*")
class PythonFunctionFactory(object):
diff --git a/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java b/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
index 19c6b4d..f028ed2 100644
--- a/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
+++ b/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
@@ -18,9 +18,6 @@
package org.apache.flink.python.util;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.datastream.runtime.operators.python.DataStreamPythonPartitionCustomFunctionOperator;
-import org.apache.flink.datastream.runtime.operators.python.DataStreamPythonStatelessFunctionOperator;
-import org.apache.flink.datastream.runtime.operators.python.DataStreamTwoInputPythonStatelessFunctionOperator;
import org.apache.flink.python.PythonConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamEdge;
@@ -30,6 +27,9 @@ import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator;
+import org.apache.flink.streaming.api.operators.python.PythonPartitionCustomOperator;
+import org.apache.flink.streaming.api.operators.python.StatelessOneInputPythonFunctionOperator;
+import org.apache.flink.streaming.api.operators.python.StatelessTwoInputPythonFunctionOperator;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import java.lang.reflect.InvocationTargetException;
@@ -84,7 +84,7 @@ public class PythonConfigUtil {
}
/**
- * Configure the {@link DataStreamPythonStatelessFunctionOperator} to be chained with the upstream/downstream
+ * Configure the {@link StatelessOneInputPythonFunctionOperator} to be chained with the upstream/downstream
* operator by setting their parallelism, slot sharing group, co-location group to be the same, and applying a
* {@link ForwardPartitioner}.
* 1. operator with name "_keyed_stream_values_operator" should align with its downstream operator.
@@ -115,7 +115,7 @@ public class PythonConfigUtil {
/**
* Generate a {@link StreamGraph} for transformations maintained by current {@link StreamExecutionEnvironment}, and
- * reset the merged env configurations with dependencies to every {@link DataStreamPythonStatelessFunctionOperator}.
+ * reset the merged env configurations with dependencies to every {@link StatelessOneInputPythonFunctionOperator}.
* It is an idempotent operation that can be call multiple times. Remember that only when need to execute the
* StreamGraph can we set the clearTransformations to be True.
*/
@@ -133,8 +133,8 @@ public class PythonConfigUtil {
StreamOperatorFactory streamOperatorFactory = streamNode.getOperatorFactory();
if (streamOperatorFactory instanceof SimpleOperatorFactory) {
StreamOperator streamOperator = ((SimpleOperatorFactory) streamOperatorFactory).getOperator();
- if ((streamOperator instanceof DataStreamPythonStatelessFunctionOperator) ||
- (streamOperator instanceof DataStreamTwoInputPythonStatelessFunctionOperator)) {
+ if ((streamOperator instanceof StatelessOneInputPythonFunctionOperator) ||
+ (streamOperator instanceof StatelessTwoInputPythonFunctionOperator)) {
AbstractPythonFunctionOperator abstractPythonFunctionOperator =
(AbstractPythonFunctionOperator) streamOperator;
@@ -157,9 +157,9 @@ public class PythonConfigUtil {
StreamOperatorFactory streamOperatorFactory = streamNode.getOperatorFactory();
if (streamOperatorFactory instanceof SimpleOperatorFactory) {
StreamOperator streamOperator = ((SimpleOperatorFactory) streamOperatorFactory).getOperator();
- if (streamOperator instanceof DataStreamPythonPartitionCustomFunctionOperator) {
- DataStreamPythonPartitionCustomFunctionOperator paritionCustomFunctionOperator =
- (DataStreamPythonPartitionCustomFunctionOperator) streamOperator;
+ if (streamOperator instanceof PythonPartitionCustomOperator) {
+ PythonPartitionCustomOperator paritionCustomFunctionOperator =
+ (PythonPartitionCustomOperator) streamOperator;
// Update the numPartitions of PartitionCustomOperator after aligned all operators.
paritionCustomFunctionOperator.setNumPartitions(
diff --git a/flink-python/src/main/java/org/apache/flink/datastream/runtime/functions/python/DataStreamPythonFunction.java b/flink-python/src/main/java/org/apache/flink/streaming/api/functions/python/DataStreamPythonFunction.java
similarity index 91%
rename from flink-python/src/main/java/org/apache/flink/datastream/runtime/functions/python/DataStreamPythonFunction.java
rename to flink-python/src/main/java/org/apache/flink/streaming/api/functions/python/DataStreamPythonFunction.java
index ce7a606..4ae6685 100644
--- a/flink-python/src/main/java/org/apache/flink/datastream/runtime/functions/python/DataStreamPythonFunction.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/functions/python/DataStreamPythonFunction.java
@@ -16,22 +16,23 @@
* limitations under the License.
*/
-package org.apache.flink.datastream.runtime.functions.python;
+package org.apache.flink.streaming.api.functions.python;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.table.functions.python.PythonEnv;
import org.apache.flink.table.functions.python.PythonFunction;
/**
* {@link DataStreamPythonFunction} maintains the serialized python function and its function type, which will be used in
- * BeamDataStreamPythonStatelessFunctionRunner.
+ * BeamDataStreamStatelessPythonFunctionRunner.
*/
+@Internal
public class DataStreamPythonFunction implements PythonFunction {
private static final long serialVersionUID = 1L;
private final byte[] serializedPythonFunction;
private final PythonEnv pythonEnv;
public DataStreamPythonFunction(
- String funcName,
byte[] serializedPythonFunction,
PythonEnv pythonEnv) {
this.serializedPythonFunction = serializedPythonFunction;
diff --git a/flink-python/src/main/java/org/apache/flink/datastream/runtime/functions/python/DataStreamPythonFunctionInfo.java b/flink-python/src/main/java/org/apache/flink/streaming/api/functions/python/DataStreamPythonFunctionInfo.java
similarity index 93%
rename from flink-python/src/main/java/org/apache/flink/datastream/runtime/functions/python/DataStreamPythonFunctionInfo.java
rename to flink-python/src/main/java/org/apache/flink/streaming/api/functions/python/DataStreamPythonFunctionInfo.java
index ce6bc9e..c7fea86 100644
--- a/flink-python/src/main/java/org/apache/flink/datastream/runtime/functions/python/DataStreamPythonFunctionInfo.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/functions/python/DataStreamPythonFunctionInfo.java
@@ -15,8 +15,9 @@
* limitations under the License.
*/
-package org.apache.flink.datastream.runtime.functions.python;
+package org.apache.flink.streaming.api.functions.python;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.table.functions.python.PythonFunction;
import java.io.Serializable;
@@ -24,6 +25,7 @@ import java.io.Serializable;
/**
* {@link DataStreamPythonFunctionInfo} holds a PythonFunction and its function type.
* */
+@Internal
public class DataStreamPythonFunctionInfo implements Serializable {
private static final long serialVersionUID = 1L;
diff --git a/flink-python/src/main/java/org/apache/flink/datastream/runtime/functions/python/PartitionCustomKeySelector.java b/flink-python/src/main/java/org/apache/flink/streaming/api/functions/python/PartitionCustomKeySelector.java
similarity index 78%
rename from flink-python/src/main/java/org/apache/flink/datastream/runtime/functions/python/PartitionCustomKeySelector.java
rename to flink-python/src/main/java/org/apache/flink/streaming/api/functions/python/PartitionCustomKeySelector.java
index 04391b3..995bb28 100644
--- a/flink-python/src/main/java/org/apache/flink/datastream/runtime/functions/python/PartitionCustomKeySelector.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/functions/python/PartitionCustomKeySelector.java
@@ -16,19 +16,24 @@
* limitations under the License.
*/
-package org.apache.flink.datastream.runtime.functions.python;
+package org.apache.flink.streaming.api.functions.python;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.operators.python.PythonPartitionCustomOperator;
import org.apache.flink.types.Row;
/**
- * The {@link PartitionCustomKeySelector} will return the first field of the input row value. The input value is
- * generated by the
- * {@link org.apache.flink.datastream.runtime.operators.python.DataStreamPythonPartitionCustomFunctionOperator} after
+ * The {@link PartitionCustomKeySelector} will return the first field of the input row value.
+ * The input value is generated by the {@link PythonPartitionCustomOperator} after
* executed user defined partitioner and keySelector function. The value of the first field will be the desired
* partition index.
*/
+@Internal
public class PartitionCustomKeySelector implements KeySelector<Row, Integer> {
+
+ private static final long serialVersionUID = 1L;
+
@Override
public Integer getKey(Row value) throws Exception {
return (Integer) value.getField(0);
diff --git a/flink-python/src/main/java/org/apache/flink/datastream/runtime/functions/python/PickledKeySelector.java b/flink-python/src/main/java/org/apache/flink/streaming/api/functions/python/PickledKeySelector.java
similarity index 84%
rename from flink-python/src/main/java/org/apache/flink/datastream/runtime/functions/python/PickledKeySelector.java
rename to flink-python/src/main/java/org/apache/flink/streaming/api/functions/python/PickledKeySelector.java
index 1bc3699..91cff04 100644
--- a/flink-python/src/main/java/org/apache/flink/datastream/runtime/functions/python/PickledKeySelector.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/functions/python/PickledKeySelector.java
@@ -15,22 +15,26 @@
* limitations under the License.
*/
-package org.apache.flink.datastream.runtime.functions.python;
+package org.apache.flink.streaming.api.functions.python;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.types.Row;
import net.razorvine.pickle.Unpickler;
/**
- * {@link PickledKeySelector} is responsible for extracting the first filed of the input row as key.
+ * {@link PickledKeySelector} is responsible for extracting the first field of the input row as key.
* The input row is generated by python DataStream map function in the format of (key_selector.get_key(value), value)
* tuple2.
*/
+@Internal
public class PickledKeySelector implements KeySelector<Row, Object> {
- private Unpickler unpickler = null;
- private boolean isPickledByteArray = false;
+ private static final long serialVersionUID = 1L;
+
+ private final boolean isPickledByteArray;
+ private transient Unpickler unpickler = null;
public PickledKeySelector(boolean isPickledByteArray) {
this.isPickledByteArray = isPickledByteArray;
diff --git a/flink-python/src/main/java/org/apache/flink/datastream/runtime/operators/python/DataStreamPythonPartitionCustomFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonPartitionCustomOperator.java
similarity index 81%
rename from flink-python/src/main/java/org/apache/flink/datastream/runtime/operators/python/DataStreamPythonPartitionCustomFunctionOperator.java
rename to flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonPartitionCustomOperator.java
index 2a39935..d7802a2 100644
--- a/flink-python/src/main/java/org/apache/flink/datastream/runtime/operators/python/DataStreamPythonPartitionCustomFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonPartitionCustomOperator.java
@@ -16,31 +16,33 @@
* limitations under the License.
*/
-package org.apache.flink.datastream.runtime.operators.python;
+package org.apache.flink.streaming.api.operators.python;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.datastream.runtime.functions.python.DataStreamPythonFunctionInfo;
-import org.apache.flink.datastream.runtime.runners.python.beam.BeamDataStreamPythonStatelessFunctionRunner;
import org.apache.flink.python.PythonFunctionRunner;
import org.apache.flink.python.env.PythonEnvironmentManager;
import org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamStatelessPythonFunctionRunner;
/**
- * The {@link DataStreamPythonPartitionCustomFunctionOperator} enables us to set the number of partitions for current
+ * The {@link PythonPartitionCustomOperator} enables us to set the number of partitions for current
* operator dynamically when generating the {@link org.apache.flink.streaming.api.graph.StreamGraph} before executing
* the job. The number of partitions will be set in environment variables for python Worker, so that we can obtain the
* number of partitions when executing user defined partitioner function.
*/
-public class DataStreamPythonPartitionCustomFunctionOperator<IN, OUT> extends
- DataStreamPythonStatelessFunctionOperator<IN, OUT> {
+@Internal
+public class PythonPartitionCustomOperator<IN, OUT> extends
+ StatelessOneInputPythonFunctionOperator<IN, OUT> {
public static final String DATA_STREAM_NUM_PARTITIONS = "DATA_STREAM_NUM_PARTITIONS";
private int numPartitions = CoreOptions.DEFAULT_PARALLELISM.defaultValue();
- public DataStreamPythonPartitionCustomFunctionOperator(
+ public PythonPartitionCustomOperator(
Configuration config,
TypeInformation<IN> inputTypeInfo,
TypeInformation<OUT> outputTypeInfo,
@@ -56,7 +58,7 @@ public class DataStreamPythonPartitionCustomFunctionOperator<IN, OUT> extends
envManager.setEnvironmentVariable(DATA_STREAM_NUM_PARTITIONS,
String.valueOf(this.numPartitions));
}
- return new BeamDataStreamPythonStatelessFunctionRunner(
+ return new BeamDataStreamStatelessPythonFunctionRunner(
getRuntimeContext().getTaskName(),
pythonEnvironmentManager,
inputTypeInfo,
diff --git a/flink-python/src/main/java/org/apache/flink/datastream/runtime/operators/python/DataStreamPythonReduceFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonReduceOperator.java
similarity index 85%
rename from flink-python/src/main/java/org/apache/flink/datastream/runtime/operators/python/DataStreamPythonReduceFunctionOperator.java
rename to flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonReduceOperator.java
index b73da63..9f2ba13 100644
--- a/flink-python/src/main/java/org/apache/flink/datastream/runtime/operators/python/DataStreamPythonReduceFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonReduceOperator.java
@@ -16,8 +16,9 @@
* limitations under the License.
*/
-package org.apache.flink.datastream.runtime.operators.python;
+package org.apache.flink.streaming.api.operators.python;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -25,19 +26,20 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.datastream.runtime.functions.python.DataStreamPythonFunctionInfo;
-import org.apache.flink.datastream.runtime.runners.python.beam.BeamDataStreamPythonStatelessFunctionRunner;
-import org.apache.flink.datastream.runtime.typeutils.python.PythonTypeUtils;
import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamStatelessPythonFunctionRunner;
+import org.apache.flink.streaming.api.typeutils.PythonTypeUtils;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.types.Row;
/**
- * {@link DataStreamPythonReduceFunctionOperator} is responsible for launching beam runner which
+ * {@link PythonReduceOperator} is responsible for launching beam runner which
* will start a python harness to execute user defined python ReduceFunction.
*/
-public class DataStreamPythonReduceFunctionOperator<OUT>
- extends DataStreamPythonStatelessFunctionOperator<Row, OUT> {
+@Internal
+public class PythonReduceOperator<OUT>
+ extends StatelessOneInputPythonFunctionOperator<Row, OUT> {
private static final long serialVersionUID = 1L;
@@ -51,7 +53,7 @@ public class DataStreamPythonReduceFunctionOperator<OUT>
private transient Row reuseRow;
- public DataStreamPythonReduceFunctionOperator(
+ public PythonReduceOperator(
Configuration config,
TypeInformation<Row> inputTypeInfo,
TypeInformation<OUT> outputTypeInfo,
@@ -108,7 +110,7 @@ public class DataStreamPythonReduceFunctionOperator<OUT>
@Override
public PythonFunctionRunner createPythonFunctionRunner() throws Exception {
- return new BeamDataStreamPythonStatelessFunctionRunner(
+ return new BeamDataStreamStatelessPythonFunctionRunner(
getRuntimeContext().getTaskName(),
createPythonEnvironmentManager(),
runnerInputTypeInfo,
diff --git a/flink-python/src/main/java/org/apache/flink/datastream/runtime/operators/python/DataStreamPythonStatelessFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/StatelessOneInputPythonFunctionOperator.java
similarity index 89%
rename from flink-python/src/main/java/org/apache/flink/datastream/runtime/operators/python/DataStreamPythonStatelessFunctionOperator.java
rename to flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/StatelessOneInputPythonFunctionOperator.java
index 84d43f2..c41cfac 100644
--- a/flink-python/src/main/java/org/apache/flink/datastream/runtime/operators/python/DataStreamPythonStatelessFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/StatelessOneInputPythonFunctionOperator.java
@@ -15,8 +15,9 @@
* limitations under the License.
*/
-package org.apache.flink.datastream.runtime.operators.python;
+package org.apache.flink.streaming.api.operators.python;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -25,12 +26,11 @@ import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.datastream.runtime.functions.python.DataStreamPythonFunctionInfo;
-import org.apache.flink.datastream.runtime.runners.python.beam.BeamDataStreamPythonStatelessFunctionRunner;
-import org.apache.flink.datastream.runtime.typeutils.python.PythonTypeUtils;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.python.PythonFunctionRunner;
-import org.apache.flink.streaming.api.operators.python.AbstractOneInputPythonFunctionOperator;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamStatelessPythonFunctionRunner;
+import org.apache.flink.streaming.api.typeutils.PythonTypeUtils;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.functions.python.PythonEnv;
import org.apache.flink.table.runtime.util.StreamRecordCollector;
@@ -40,10 +40,11 @@ import com.google.protobuf.ByteString;
import java.util.Map;
/**
- * {@link DataStreamPythonStatelessFunctionOperator} is responsible for launching beam runner which will start a python
+ * {@link StatelessOneInputPythonFunctionOperator} is responsible for launching beam runner which will start a python
* harness to execute user defined python function.
*/
-public class DataStreamPythonStatelessFunctionOperator<IN, OUT>
+@Internal
+public class StatelessOneInputPythonFunctionOperator<IN, OUT>
extends AbstractOneInputPythonFunctionOperator<IN, OUT> {
private static final long serialVersionUID = 1L;
@@ -76,7 +77,7 @@ public class DataStreamPythonStatelessFunctionOperator<IN, OUT>
protected transient StreamRecordCollector streamRecordCollector;
- public DataStreamPythonStatelessFunctionOperator(
+ public StatelessOneInputPythonFunctionOperator(
Configuration config,
TypeInformation<IN> inputTypeInfo,
TypeInformation<OUT> outputTypeInfo,
@@ -116,7 +117,7 @@ public class DataStreamPythonStatelessFunctionOperator<IN, OUT>
coderUrn = DATA_STREAM_FLAT_MAP_FUNCTION_CODER_URN;
}
- return new BeamDataStreamPythonStatelessFunctionRunner(
+ return new BeamDataStreamStatelessPythonFunctionRunner(
getRuntimeContext().getTaskName(),
createPythonEnvironmentManager(),
inputTypeInfo,
diff --git a/flink-python/src/main/java/org/apache/flink/datastream/runtime/operators/python/DataStreamTwoInputPythonStatelessFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/StatelessTwoInputPythonFunctionOperator.java
similarity index 92%
rename from flink-python/src/main/java/org/apache/flink/datastream/runtime/operators/python/DataStreamTwoInputPythonStatelessFunctionOperator.java
rename to flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/StatelessTwoInputPythonFunctionOperator.java
index e35f070..8882e1d 100644
--- a/flink-python/src/main/java/org/apache/flink/datastream/runtime/operators/python/DataStreamTwoInputPythonStatelessFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/StatelessTwoInputPythonFunctionOperator.java
@@ -15,8 +15,9 @@
* limitations under the License.
*/
-package org.apache.flink.datastream.runtime.operators.python;
+package org.apache.flink.streaming.api.operators.python;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -27,12 +28,11 @@ import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.datastream.runtime.functions.python.DataStreamPythonFunctionInfo;
-import org.apache.flink.datastream.runtime.runners.python.beam.BeamDataStreamPythonStatelessFunctionRunner;
-import org.apache.flink.datastream.runtime.typeutils.python.PythonTypeUtils;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.python.PythonFunctionRunner;
-import org.apache.flink.streaming.api.operators.python.AbstractTwoInputPythonFunctionOperator;
+import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamStatelessPythonFunctionRunner;
+import org.apache.flink.streaming.api.typeutils.PythonTypeUtils;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.functions.python.PythonEnv;
import org.apache.flink.table.runtime.util.StreamRecordCollector;
@@ -43,10 +43,11 @@ import com.google.protobuf.ByteString;
import java.util.Map;
/**
- * {@link DataStreamTwoInputPythonStatelessFunctionOperator} is responsible for launching beam
+ * {@link StatelessTwoInputPythonFunctionOperator} is responsible for launching beam
* runner which will start a python harness to execute two-input user defined python function.
*/
-public class DataStreamTwoInputPythonStatelessFunctionOperator<IN1, IN2, OUT>
+@Internal
+public class StatelessTwoInputPythonFunctionOperator<IN1, IN2, OUT>
extends AbstractTwoInputPythonFunctionOperator<IN1, IN2, OUT> {
private static final long serialVersionUID = 1L;
@@ -86,7 +87,7 @@ public class DataStreamTwoInputPythonStatelessFunctionOperator<IN1, IN2, OUT>
private final TypeInformation<IN1> inputTypeInfo1;
private final TypeInformation<IN2> inputTypeInfo2;
- public DataStreamTwoInputPythonStatelessFunctionOperator(
+ public StatelessTwoInputPythonFunctionOperator(
Configuration config,
TypeInformation<IN1> inputTypeInfo1,
TypeInformation<IN2> inputTypeInfo2,
@@ -148,7 +149,7 @@ public class DataStreamTwoInputPythonStatelessFunctionOperator<IN1, IN2, OUT>
throw new RuntimeException("Function Type for ConnectedStream should be Map or FlatMap");
}
- return new BeamDataStreamPythonStatelessFunctionRunner(
+ return new BeamDataStreamStatelessPythonFunctionRunner(
getRuntimeContext().getTaskName(),
createPythonEnvironmentManager(),
runnerInputTypeInfo,
diff --git a/flink-python/src/main/java/org/apache/flink/datastream/runtime/runners/python/beam/BeamDataStreamPythonStatelessFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamStatelessPythonFunctionRunner.java
similarity index 85%
rename from flink-python/src/main/java/org/apache/flink/datastream/runtime/runners/python/beam/BeamDataStreamPythonStatelessFunctionRunner.java
rename to flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamStatelessPythonFunctionRunner.java
index 488d8fa..359af18 100644
--- a/flink-python/src/main/java/org/apache/flink/datastream/runtime/runners/python/beam/BeamDataStreamPythonStatelessFunctionRunner.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamStatelessPythonFunctionRunner.java
@@ -16,14 +16,14 @@
* limitations under the License.
*/
-package org.apache.flink.datastream.runtime.runners.python.beam;
+package org.apache.flink.streaming.api.runners.python.beam;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.datastream.runtime.typeutils.python.PythonTypeUtils;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.python.env.PythonEnvironmentManager;
import org.apache.flink.python.metric.FlinkMetricContainer;
-import org.apache.flink.streaming.api.runners.python.beam.BeamPythonStatelessFunctionRunner;
+import org.apache.flink.streaming.api.typeutils.PythonTypeUtils;
import org.apache.beam.model.pipeline.v1.RunnerApi;
@@ -32,18 +32,18 @@ import javax.annotation.Nullable;
import java.util.Map;
/**
- * {@link BeamDataStreamPythonStatelessFunctionRunner} is responsible for starting a beam python harness to execute user
+ * {@link BeamDataStreamStatelessPythonFunctionRunner} is responsible for starting a beam python harness to execute user
* defined python function.
*/
-public class BeamDataStreamPythonStatelessFunctionRunner extends BeamPythonStatelessFunctionRunner {
- private static final long serialVersionUID = 1L;
+@Internal
+public class BeamDataStreamStatelessPythonFunctionRunner extends BeamStatelessPythonFunctionRunner {
private final TypeInformation inputType;
private final TypeInformation outputTupe;
private final FlinkFnApi.UserDefinedDataStreamFunctions userDefinedDataStreamFunctions;
private final String coderUrn;
- public BeamDataStreamPythonStatelessFunctionRunner(
+ public BeamDataStreamStatelessPythonFunctionRunner(
String taskName,
PythonEnvironmentManager environmentManager,
TypeInformation inputType,
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
index 2516147..27537d6 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.api.runners.python.beam;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.python.PythonConfig;
import org.apache.flink.python.PythonFunctionRunner;
@@ -57,6 +58,7 @@ import java.util.concurrent.LinkedBlockingQueue;
/**
* An base class for {@link PythonFunctionRunner} based on beam.
*/
+@Internal
public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner {
protected static final Logger LOG = LoggerFactory.getLogger(BeamPythonFunctionRunner.class);
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonStatelessFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamStatelessPythonFunctionRunner.java
similarity index 98%
rename from flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonStatelessFunctionRunner.java
rename to flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamStatelessPythonFunctionRunner.java
index 91acbc8..a999d0e 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonStatelessFunctionRunner.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamStatelessPythonFunctionRunner.java
@@ -52,7 +52,7 @@ import static org.apache.beam.runners.core.construction.BeamUrns.getUrn;
* A {@link BeamPythonFunctionRunner} used to execute Python stateless functions.
*/
@Internal
-public abstract class BeamPythonStatelessFunctionRunner extends BeamPythonFunctionRunner {
+public abstract class BeamStatelessPythonFunctionRunner extends BeamPythonFunctionRunner {
private static final String INPUT_ID = "input";
private static final String OUTPUT_ID = "output";
private static final String TRANSFORM_ID = "transform";
@@ -68,7 +68,7 @@ public abstract class BeamPythonStatelessFunctionRunner extends BeamPythonFuncti
private final String functionUrn;
- public BeamPythonStatelessFunctionRunner(
+ public BeamStatelessPythonFunctionRunner(
String taskName,
PythonEnvironmentManager environmentManager,
String functionUrn,
diff --git a/flink-python/src/main/java/org/apache/flink/datastream/typeinfo/python/PickledByteArrayTypeInfo.java b/flink-python/src/main/java/org/apache/flink/streaming/api/typeinfo/python/PickledByteArrayTypeInfo.java
similarity index 95%
rename from flink-python/src/main/java/org/apache/flink/datastream/typeinfo/python/PickledByteArrayTypeInfo.java
rename to flink-python/src/main/java/org/apache/flink/streaming/api/typeinfo/python/PickledByteArrayTypeInfo.java
index 4c9a893..55bcc73 100644
--- a/flink-python/src/main/java/org/apache/flink/datastream/typeinfo/python/PickledByteArrayTypeInfo.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/typeinfo/python/PickledByteArrayTypeInfo.java
@@ -15,8 +15,9 @@
* limitations under the License.
*/
-package org.apache.flink.datastream.typeinfo.python;
+package org.apache.flink.streaming.api.typeinfo.python;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -26,6 +27,7 @@ import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerial
* A PickledByteArrayTypeInfo indicates that the data of this type is a generated primitive byte
* array by pickle.
*/
+@Internal
public class PickledByteArrayTypeInfo extends TypeInformation<byte[]> {
public static final PickledByteArrayTypeInfo PICKLED_BYTE_ARRAY_TYPE_INFO = new PickledByteArrayTypeInfo();
diff --git a/flink-python/src/main/java/org/apache/flink/datastream/runtime/typeutils/python/PythonTypeUtils.java b/flink-python/src/main/java/org/apache/flink/streaming/api/typeutils/PythonTypeUtils.java
similarity index 98%
rename from flink-python/src/main/java/org/apache/flink/datastream/runtime/typeutils/python/PythonTypeUtils.java
rename to flink-python/src/main/java/org/apache/flink/streaming/api/typeutils/PythonTypeUtils.java
index 8927f1b..c526ab1 100644
--- a/flink-python/src/main/java/org/apache/flink/datastream/runtime/typeutils/python/PythonTypeUtils.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/typeutils/PythonTypeUtils.java
@@ -16,8 +16,9 @@
* limitations under the License.
*/
-package org.apache.flink.datastream.runtime.typeutils.python;
+package org.apache.flink.streaming.api.typeutils;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -44,8 +45,8 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.runtime.RowSerializer;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
-import org.apache.flink.datastream.typeinfo.python.PickledByteArrayTypeInfo;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.streaming.api.typeinfo.python.PickledByteArrayTypeInfo;
import org.apache.flink.table.runtime.typeutils.serializers.python.BigDecSerializer;
import org.apache.flink.table.runtime.typeutils.serializers.python.StringSerializer;
@@ -56,6 +57,7 @@ import java.util.Map;
/**
* A util class for converting the given TypeInformation to other objects.
*/
+@Internal
public class PythonTypeUtils {
/**
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/AbstractPythonStatelessFunctionFlatMap.java b/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/AbstractPythonStatelessFunctionFlatMap.java
index 12c1bcc..e891acb 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/AbstractPythonStatelessFunctionFlatMap.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/AbstractPythonStatelessFunctionFlatMap.java
@@ -40,7 +40,7 @@ import org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager;
import org.apache.flink.python.metric.FlinkMetricContainer;
import org.apache.flink.table.functions.python.PythonEnv;
import org.apache.flink.table.functions.python.PythonFunctionInfo;
-import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonStatelessFunctionRunner;
+import org.apache.flink.table.runtime.runners.python.beam.BeamTableStatelessPythonFunctionRunner;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter;
import org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter;
@@ -329,7 +329,7 @@ public abstract class AbstractPythonStatelessFunctionFlatMap
}
private PythonFunctionRunner createPythonFunctionRunner() throws IOException {
- return new BeamTablePythonStatelessFunctionRunner(
+ return new BeamTableStatelessPythonFunctionRunner(
getRuntimeContext().getTaskName(),
createPythonEnvironmentManager(),
userDefinedFunctionInputType,
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java
index eb46961..35e543c 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java
@@ -30,7 +30,7 @@ import org.apache.flink.streaming.api.operators.python.AbstractOneInputPythonFun
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.python.PythonFunctionInfo;
-import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonStatelessFunctionRunner;
+import org.apache.flink.table.runtime.runners.python.beam.BeamTableStatelessPythonFunctionRunner;
import org.apache.flink.table.runtime.types.CRow;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
@@ -152,7 +152,7 @@ public abstract class AbstractStatelessFunctionOperator<IN, OUT, UDFIN>
@Override
public PythonFunctionRunner createPythonFunctionRunner() throws IOException {
- return new BeamTablePythonStatelessFunctionRunner(
+ return new BeamTableStatelessPythonFunctionRunner(
getRuntimeContext().getTaskName(),
createPythonEnvironmentManager(),
userDefinedFunctionInputType,
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonStatelessFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTableStatelessPythonFunctionRunner.java
similarity index 91%
rename from flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonStatelessFunctionRunner.java
rename to flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTableStatelessPythonFunctionRunner.java
index 91ff931..0714161 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonStatelessFunctionRunner.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTableStatelessPythonFunctionRunner.java
@@ -22,7 +22,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.python.env.PythonEnvironmentManager;
import org.apache.flink.python.metric.FlinkMetricContainer;
-import org.apache.flink.streaming.api.runners.python.beam.BeamPythonStatelessFunctionRunner;
+import org.apache.flink.streaming.api.runners.python.beam.BeamStatelessPythonFunctionRunner;
import org.apache.flink.table.runtime.typeutils.PythonTypeUtils;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
@@ -33,17 +33,17 @@ import org.apache.beam.model.pipeline.v1.RunnerApi;
import java.util.Map;
/**
- * A {@link BeamTablePythonStatelessFunctionRunner} used to execute Python stateless functions.
+ * A {@link BeamTableStatelessPythonFunctionRunner} used to execute Python stateless functions.
*/
@Internal
-public class BeamTablePythonStatelessFunctionRunner extends BeamPythonStatelessFunctionRunner {
+public class BeamTableStatelessPythonFunctionRunner extends BeamStatelessPythonFunctionRunner {
private final RowType inputType;
private final RowType outputType;
private final String coderUrn;
private final FlinkFnApi.UserDefinedFunctions userDefinedFunctions;
- public BeamTablePythonStatelessFunctionRunner(
+ public BeamTableStatelessPythonFunctionRunner(
String taskName,
PythonEnvironmentManager environmentManager,
RowType inputType,
diff --git a/flink-python/src/test/java/org/apache/flink/python/util/PythonTypeUtilsTest.java b/flink-python/src/test/java/org/apache/flink/streaming/api/typeutils/PythonTypeUtilsTest.java
similarity index 97%
rename from flink-python/src/test/java/org/apache/flink/python/util/PythonTypeUtilsTest.java
rename to flink-python/src/test/java/org/apache/flink/streaming/api/typeutils/PythonTypeUtilsTest.java
index b37549c..3b1a2a2 100644
--- a/flink-python/src/test/java/org/apache/flink/python/util/PythonTypeUtilsTest.java
+++ b/flink-python/src/test/java/org/apache/flink/streaming/api/typeutils/PythonTypeUtilsTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.flink.python.util;
+package org.apache.flink.streaming.api.typeutils;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
@@ -36,9 +36,8 @@ import org.apache.flink.api.common.typeutils.base.array.IntPrimitiveArraySeriali
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.runtime.RowSerializer;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
-import org.apache.flink.datastream.runtime.typeutils.python.PythonTypeUtils;
-import org.apache.flink.datastream.typeinfo.python.PickledByteArrayTypeInfo;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.streaming.api.typeinfo.python.PickledByteArrayTypeInfo;
import org.apache.flink.table.runtime.typeutils.serializers.python.BigDecSerializer;
import org.apache.flink.table.runtime.typeutils.serializers.python.StringSerializer;
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.java
index 2f474b9..e4cffa5 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.java
@@ -25,7 +25,7 @@ import org.apache.flink.python.PythonConfig;
import org.apache.flink.python.env.PythonEnvironmentManager;
import org.apache.flink.python.metric.FlinkMetricContainer;
import org.apache.flink.table.runtime.arrow.serializers.RowDataArrowSerializer;
-import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonStatelessFunctionRunner;
+import org.apache.flink.table.runtime.runners.python.beam.BeamTableStatelessPythonFunctionRunner;
import org.apache.flink.table.types.logical.RowType;
import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
@@ -39,7 +39,7 @@ import java.util.Map;
* A {@link PassThroughPythonAggregateFunctionRunner} runner that just return the first input element
* with the same key as the execution results.
*/
-public class PassThroughPythonAggregateFunctionRunner extends BeamTablePythonStatelessFunctionRunner {
+public class PassThroughPythonAggregateFunctionRunner extends BeamTableStatelessPythonFunctionRunner {
private final List<byte[]> buffer;
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java
index 57dc439..0964717 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.runtime.utils;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.python.env.PythonEnvironmentManager;
import org.apache.flink.python.metric.FlinkMetricContainer;
-import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonStatelessFunctionRunner;
+import org.apache.flink.table.runtime.runners.python.beam.BeamTableStatelessPythonFunctionRunner;
import org.apache.flink.table.types.logical.RowType;
import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
@@ -34,7 +34,7 @@ import java.util.Map;
/**
* A {@link PassThroughPythonScalarFunctionRunner} runner that just return the input elements as the execution results.
*/
-public class PassThroughPythonScalarFunctionRunner extends BeamTablePythonStatelessFunctionRunner {
+public class PassThroughPythonScalarFunctionRunner extends BeamTableStatelessPythonFunctionRunner {
private final List<byte[]> buffer;
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java
index 2fc4264..e3fecdd 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.runtime.utils;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.python.env.PythonEnvironmentManager;
import org.apache.flink.python.metric.FlinkMetricContainer;
-import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonStatelessFunctionRunner;
+import org.apache.flink.table.runtime.runners.python.beam.BeamTableStatelessPythonFunctionRunner;
import org.apache.flink.table.types.logical.RowType;
import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
@@ -32,10 +32,10 @@ import java.util.List;
import java.util.Map;
/**
- * A {@link BeamTablePythonStatelessFunctionRunner} that emit each input element in inner join and emit null in
+ * A {@link BeamTableStatelessPythonFunctionRunner} that emit each input element in inner join and emit null in
* left join when certain test conditions are met.
*/
-public class PassThroughPythonTableFunctionRunner extends BeamTablePythonStatelessFunctionRunner {
+public class PassThroughPythonTableFunctionRunner extends BeamTableStatelessPythonFunctionRunner {
private int num = 0;