You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/05/28 13:21:54 UTC
[flink] 01/02: [FLINK-22619][python] Drop usages of
BatchTableEnvironment and old planner in Python
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9fe0ec7caa5fdc7d22e6655e557df49b6263ae76
Author: Timo Walther <tw...@apache.org>
AuthorDate: Wed May 19 09:20:31 2021 +0200
[FLINK-22619][python] Drop usages of BatchTableEnvironment and old planner in Python
This is a major cleanup of the Python module that drops support for
BatchTableEnvironment and old planner.
Removes usages of:
- DataSet
- BatchTableEnvironment
- Legacy planner
- ExecutionEnvironment
---
.../docs/dev/python/table/table_environment.md | 2 +-
.../docs/dev/python/table/table_environment.md | 2 +-
.../flink-python-test/python/python_job.py | 16 +-
.../python/tests/FlinkBatchPythonUdfSqlJob.java | 56 --
.../python/tests/FlinkStreamPythonUdfSqlJob.java | 63 ---
.../test-scripts/test_pyflink.sh | 20 -
flink-python/dev/integration_test.sh | 3 -
flink-python/dev/pip_test_code.py | 12 +-
flink-python/docs/index.rst | 1 -
flink-python/docs/pyflink.dataset.rst | 28 -
flink-python/docs/pyflink.rst | 1 -
flink-python/pom.xml | 8 +-
.../pyflink/common/tests/test_execution_config.py | 8 +-
flink-python/pyflink/dataset/__init__.py | 27 -
.../pyflink/dataset/execution_environment.py | 197 -------
flink-python/pyflink/dataset/tests/__init__.py | 17 -
.../dataset/tests/test_execution_environment.py | 137 -----
.../test_execution_environment_completeness.py | 64 ---
...st_stream_execution_environment_completeness.py | 4 +-
flink-python/pyflink/shell.py | 35 --
flink-python/pyflink/table/__init__.py | 4 +-
.../pyflink/table/examples/batch/word_count.py | 8 +-
flink-python/pyflink/table/table.py | 2 +-
flink-python/pyflink/table/table_environment.py | 200 +------
flink-python/pyflink/table/tests/test_calc.py | 46 +-
.../pyflink/table/tests/test_dependency.py | 33 +-
.../pyflink/table/tests/test_descriptor.py | 56 +-
.../pyflink/table/tests/test_pandas_conversion.py | 7 +-
.../pyflink/table/tests/test_pandas_udf.py | 30 +-
.../pyflink/table/tests/test_set_operation.py | 4 +-
.../pyflink/table/tests/test_shell_example.py | 34 --
flink-python/pyflink/table/tests/test_sort.py | 4 +-
flink-python/pyflink/table/tests/test_sql.py | 12 +-
.../table/tests/test_table_environment_api.py | 629 +--------------------
flink-python/pyflink/table/tests/test_udf.py | 23 +-
flink-python/pyflink/table/tests/test_udtf.py | 29 +-
flink-python/pyflink/testing/test_case_utils.py | 90 +--
flink-python/setup.py | 1 -
.../flink/table/runtime/arrow/ArrowUtils.java | 82 +--
.../AbstractPythonScalarFunctionFlatMap.java | 119 ----
.../AbstractPythonStatelessFunctionFlatMap.java | 312 ----------
.../python/PythonScalarFunctionFlatMap.java | 94 ---
.../python/PythonTableFunctionFlatMap.java | 174 ------
.../arrow/ArrowPythonScalarFunctionFlatMap.java | 131 -----
.../AbstractRowPythonScalarFunctionOperator.java | 91 ---
.../scalar/PythonScalarFunctionOperator.java | 82 ---
.../arrow/ArrowPythonScalarFunctionOperator.java | 135 -----
.../python/table/PythonTableFunctionOperator.java | 142 -----
.../utils/StreamRecordCRowWrappingCollector.java | 53 --
.../client/python/PythonFunctionFactoryTest.java | 29 -
.../scalar/PythonScalarFunctionOperatorTest.java | 105 ----
.../ArrowPythonScalarFunctionOperatorTest.java | 103 ----
.../table/PythonTableFunctionOperatorTest.java | 92 ---
flink-python/tox.ini | 2 +-
54 files changed, 84 insertions(+), 3575 deletions(-)
diff --git a/docs/content.zh/docs/dev/python/table/table_environment.md b/docs/content.zh/docs/dev/python/table/table_environment.md
index 75c925f..0ba2738 100644
--- a/docs/content.zh/docs/dev/python/table/table_environment.md
+++ b/docs/content.zh/docs/dev/python/table/table_environment.md
@@ -48,7 +48,7 @@ table_env = TableEnvironment.create(env_settings)
```python
from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.table import StreamTableEnvironment, BatchTableEnvironment, TableConfig
+from pyflink.table import StreamTableEnvironment
# create a blink streaming TableEnvironment from a StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
diff --git a/docs/content/docs/dev/python/table/table_environment.md b/docs/content/docs/dev/python/table/table_environment.md
index ea50ebd..d00e1d2 100644
--- a/docs/content/docs/dev/python/table/table_environment.md
+++ b/docs/content/docs/dev/python/table/table_environment.md
@@ -49,7 +49,7 @@ Alternatively, users can create a `StreamTableEnvironment` from an existing `Str
```python
from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.table import StreamTableEnvironment, BatchTableEnvironment, TableConfig
+from pyflink.table import StreamTableEnvironment
# create a blink streaming TableEnvironment from a StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
diff --git a/flink-end-to-end-tests/flink-python-test/python/python_job.py b/flink-end-to-end-tests/flink-python-test/python/python_job.py
index a85e633..2afcf19 100644
--- a/flink-end-to-end-tests/flink-python-test/python/python_job.py
+++ b/flink-end-to-end-tests/flink-python-test/python/python_job.py
@@ -21,8 +21,7 @@ import shutil
import sys
import tempfile
-from pyflink.dataset import ExecutionEnvironment
-from pyflink.table import BatchTableEnvironment, TableConfig
+from pyflink.table import EnvironmentSettings, TableEnvironment
def word_count():
@@ -34,9 +33,8 @@ def word_count():
"License you may not use this file except in compliance " \
"with the License"
- t_config = TableConfig()
- env = ExecutionEnvironment.get_execution_environment()
- t_env = BatchTableEnvironment.create(env, t_config)
+ env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
+ t_env = TableEnvironment.create(environment_settings=env_settings)
# used to test pipeline.jars and pipleline.classpaths
config_key = sys.argv[1]
@@ -68,9 +66,9 @@ def word_count():
'connector.path' = '{}'
)
""".format(result_path)
- t_env.sql_update(sink_ddl)
+ t_env.execute_sql(sink_ddl)
- t_env.sql_update("create temporary system function add_one as 'add_one.add_one' language python")
+ t_env.execute_sql("create temporary system function add_one as 'add_one.add_one' language python")
t_env.register_java_function("add_one_java", "org.apache.flink.python.tests.util.AddOne")
elements = [(word, 0) for word in content.split(" ")]
@@ -78,9 +76,7 @@ def word_count():
.select("word, add_one(count) as count, add_one_java(count) as count_java") \
.group_by("word") \
.select("word, count(count) as count, count(count_java) as count_java") \
- .insert_into("Results")
-
- t_env.execute("word_count")
+ .execute_insert("Results")
if __name__ == '__main__':
diff --git a/flink-end-to-end-tests/flink-python-test/src/main/java/org/apache/flink/python/tests/FlinkBatchPythonUdfSqlJob.java b/flink-end-to-end-tests/flink-python-test/src/main/java/org/apache/flink/python/tests/FlinkBatchPythonUdfSqlJob.java
deleted file mode 100644
index 3f6768c..0000000
--- a/flink-end-to-end-tests/flink-python-test/src/main/java/org/apache/flink/python/tests/FlinkBatchPythonUdfSqlJob.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.python.tests;
-
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
-import org.apache.flink.types.Row;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-
-/** A simple job used to test submitting the Python UDF job in flink batch mode. */
-public class FlinkBatchPythonUdfSqlJob {
-
- public static void main(String[] args) {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
- tEnv.executeSql(
- "create temporary system function add_one as 'add_one.add_one' language python");
-
- tEnv.createTemporaryView("source", tEnv.fromDataSet(env.fromElements(1L, 2L, 3L)).as("a"));
-
- Iterator<Row> result = tEnv.executeSql("select add_one(a) as a from source").collect();
-
- List<Long> actual = new ArrayList<>();
- while (result.hasNext()) {
- Row r = result.next();
- actual.add((Long) r.getField(0));
- }
-
- List<Long> expected = Arrays.asList(2L, 3L, 4L);
- if (!actual.equals(expected)) {
- throw new AssertionError(
- String.format(
- "The output result: %s is not as expected: %s!", actual, expected));
- }
- }
-}
diff --git a/flink-end-to-end-tests/flink-python-test/src/main/java/org/apache/flink/python/tests/FlinkStreamPythonUdfSqlJob.java b/flink-end-to-end-tests/flink-python-test/src/main/java/org/apache/flink/python/tests/FlinkStreamPythonUdfSqlJob.java
deleted file mode 100644
index f90767e..0000000
--- a/flink-end-to-end-tests/flink-python-test/src/main/java/org/apache/flink/python/tests/FlinkStreamPythonUdfSqlJob.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.python.tests;
-
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.types.Row;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-
-/** A simple job used to test submitting the Python UDF job in flink stream mode. */
-public class FlinkStreamPythonUdfSqlJob {
-
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- StreamTableEnvironment tEnv =
- StreamTableEnvironment.create(
- env,
- EnvironmentSettings.newInstance()
- .useOldPlanner()
- .inStreamingMode()
- .build());
- tEnv.executeSql(
- "create temporary system function add_one as 'add_one.add_one' language python");
-
- tEnv.createTemporaryView("source", tEnv.fromValues(1L, 2L, 3L).as("a"));
-
- Iterator<Row> result = tEnv.executeSql("select add_one(a) as a from source").collect();
-
- List<Long> actual = new ArrayList<>();
- while (result.hasNext()) {
- Row r = result.next();
- actual.add((Long) r.getField(0));
- }
-
- List<Long> expected = Arrays.asList(2L, 3L, 4L);
- if (!actual.equals(expected)) {
- throw new AssertionError(
- String.format(
- "The output result: %s is not as expected: %s!", actual, expected));
- }
- }
-}
diff --git a/flink-end-to-end-tests/test-scripts/test_pyflink.sh b/flink-end-to-end-tests/test-scripts/test_pyflink.sh
index 24ffc59..27cfd0a 100755
--- a/flink-end-to-end-tests/test-scripts/test_pyflink.sh
+++ b/flink-end-to-end-tests/test-scripts/test_pyflink.sh
@@ -155,26 +155,6 @@ PYFLINK_CLIENT_EXECUTABLE=${PYTHON_EXEC} "${FLINK_DIR}/bin/flink" run \
-c org.apache.flink.python.tests.BlinkBatchPythonUdfSqlJob \
"${FLINK_PYTHON_TEST_DIR}/target/PythonUdfSqlJobExample.jar"
-echo "Test flink stream python udf sql job:\n"
-PYFLINK_CLIENT_EXECUTABLE=${PYTHON_EXEC} "${FLINK_DIR}/bin/flink" run \
- -p 2 \
- -pyfs "${FLINK_PYTHON_TEST_DIR}/python/add_one.py" \
- -pyreq "${REQUIREMENTS_PATH}" \
- -pyarch "${TEST_DATA_DIR}/venv.zip" \
- -pyexec "venv.zip/.conda/bin/python" \
- -c org.apache.flink.python.tests.FlinkStreamPythonUdfSqlJob \
- "${FLINK_PYTHON_TEST_DIR}/target/PythonUdfSqlJobExample.jar"
-
-echo "Test flink batch python udf sql job:\n"
-PYFLINK_CLIENT_EXECUTABLE=${PYTHON_EXEC} "${FLINK_DIR}/bin/flink" run \
- -p 2 \
- -pyfs "${FLINK_PYTHON_TEST_DIR}/python/add_one.py" \
- -pyreq "${REQUIREMENTS_PATH}" \
- -pyarch "${TEST_DATA_DIR}/venv.zip" \
- -pyexec "venv.zip/.conda/bin/python" \
- -c org.apache.flink.python.tests.FlinkBatchPythonUdfSqlJob \
- "${FLINK_PYTHON_TEST_DIR}/target/PythonUdfSqlJobExample.jar"
-
echo "Test using python udf in sql client:\n"
SQL_CONF=$TEST_DATA_DIR/sql-client-session.conf
diff --git a/flink-python/dev/integration_test.sh b/flink-python/dev/integration_test.sh
index d2f10e5..c598d09 100755
--- a/flink-python/dev/integration_test.sh
+++ b/flink-python/dev/integration_test.sh
@@ -36,9 +36,6 @@ FLINK_PYTHON_DIR=$(dirname "$CURRENT_DIR")
# test common module
test_module "common"
-# test dataset module
-test_module "dataset"
-
# test datastream module
test_module "datastream"
diff --git a/flink-python/dev/pip_test_code.py b/flink-python/dev/pip_test_code.py
index 0904b53..3bd1062 100755
--- a/flink-python/dev/pip_test_code.py
+++ b/flink-python/dev/pip_test_code.py
@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
# test pyflink shell environment
-from pyflink.shell import b_env, bt_env, FileSystem, OldCsv, DataTypes, Schema
+from pyflink.shell import s_env, st_env, FileSystem, OldCsv, DataTypes, Schema
import tempfile
import os
@@ -28,9 +28,9 @@ if os.path.exists(sink_path):
os.remove(sink_path)
else:
shutil.rmtree(sink_path)
-b_env.set_parallelism(1)
-t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
-bt_env.connect(FileSystem().path(sink_path)) \
+s_env.set_parallelism(1)
+t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
+st_env.connect(FileSystem().path(sink_path)) \
.with_format(OldCsv()
.field_delimiter(',')
.field("a", DataTypes.BIGINT())
@@ -40,9 +40,9 @@ bt_env.connect(FileSystem().path(sink_path)) \
.field("a", DataTypes.BIGINT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.STRING())) \
- .create_temporary_table("batch_sink")
+ .create_temporary_table("csv_sink")
-t.select("a + 1, b, c").execute_insert("batch_sink").wait()
+t.select("a + 1, b, c").execute_insert("csv_sink").wait()
with open(sink_path, 'r') as f:
lines = f.read()
diff --git a/flink-python/docs/index.rst b/flink-python/docs/index.rst
index 3d6f538..7a174fd 100644
--- a/flink-python/docs/index.rst
+++ b/flink-python/docs/index.rst
@@ -26,7 +26,6 @@ Welcome to Flink Python API Docs!
pyflink
pyflink.common
pyflink.table
- pyflink.dataset
pyflink.datastream
pyflink.metrics
diff --git a/flink-python/docs/pyflink.dataset.rst b/flink-python/docs/pyflink.dataset.rst
deleted file mode 100644
index dc42fcd..0000000
--- a/flink-python/docs/pyflink.dataset.rst
+++ /dev/null
@@ -1,28 +0,0 @@
-.. ################################################################################
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ################################################################################
-
-pyflink.dataset package
-=======================
-
-Module contents
----------------
-
-.. automodule:: pyflink.dataset
- :members:
- :undoc-members:
- :show-inheritance:
diff --git a/flink-python/docs/pyflink.rst b/flink-python/docs/pyflink.rst
index ff17b2a..81ba9f7 100644
--- a/flink-python/docs/pyflink.rst
+++ b/flink-python/docs/pyflink.rst
@@ -27,7 +27,6 @@ Subpackages
pyflink.common
pyflink.table
- pyflink.dataset
pyflink.datastream
.. automodule:: pyflink
diff --git a/flink-python/pom.xml b/flink-python/pom.xml
index 9bb15ec..7199b2a 100644
--- a/flink-python/pom.xml
+++ b/flink-python/pom.xml
@@ -70,7 +70,7 @@ under the License.
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+ <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
@@ -80,12 +80,6 @@ under the License.
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
<!-- Beam dependencies -->
diff --git a/flink-python/pyflink/common/tests/test_execution_config.py b/flink-python/pyflink/common/tests/test_execution_config.py
index 8f3ba67..33433df 100644
--- a/flink-python/pyflink/common/tests/test_execution_config.py
+++ b/flink-python/pyflink/common/tests/test_execution_config.py
@@ -15,7 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-from pyflink.dataset import ExecutionEnvironment
+from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common import (ExecutionConfig, RestartStrategies, ExecutionMode)
from pyflink.java_gateway import get_gateway
from pyflink.testing.test_case_utils import PyFlinkTestCase
@@ -24,7 +24,7 @@ from pyflink.testing.test_case_utils import PyFlinkTestCase
class ExecutionConfigTests(PyFlinkTestCase):
def setUp(self):
- self.env = ExecutionEnvironment.get_execution_environment()
+ self.env = StreamExecutionEnvironment.get_execution_environment()
self.execution_config = self.env.get_config()
def test_constant(self):
@@ -253,9 +253,9 @@ class ExecutionConfigTests(PyFlinkTestCase):
def test_equals_and_hash(self):
- config1 = ExecutionEnvironment.get_execution_environment().get_config()
+ config1 = StreamExecutionEnvironment.get_execution_environment().get_config()
- config2 = ExecutionEnvironment.get_execution_environment().get_config()
+ config2 = StreamExecutionEnvironment.get_execution_environment().get_config()
self.assertEqual(config1, config2)
diff --git a/flink-python/pyflink/dataset/__init__.py b/flink-python/pyflink/dataset/__init__.py
deleted file mode 100644
index 3bd64cd..0000000
--- a/flink-python/pyflink/dataset/__init__.py
+++ /dev/null
@@ -1,27 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-"""
-Important classes of Flink Batch API:
-
- - :class:`ExecutionEnvironment`:
- The ExecutionEnvironment is the context in which a batch program is executed.
-"""
-from pyflink.dataset.execution_environment import ExecutionEnvironment
-
-__all__ = ['ExecutionEnvironment']
diff --git a/flink-python/pyflink/dataset/execution_environment.py b/flink-python/pyflink/dataset/execution_environment.py
deleted file mode 100644
index 821189c..0000000
--- a/flink-python/pyflink/dataset/execution_environment.py
+++ /dev/null
@@ -1,197 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-from pyflink.common.execution_config import ExecutionConfig
-from pyflink.common.job_execution_result import JobExecutionResult
-from pyflink.common.restart_strategy import RestartStrategies, RestartStrategyConfiguration
-from pyflink.java_gateway import get_gateway
-from pyflink.util.java_utils import load_java_class
-
-
-class ExecutionEnvironment(object):
- """
- The ExecutionEnvironment is the context in which a program is executed.
-
- The environment provides methods to control the job execution (such as setting the parallelism)
- and to interact with the outside world (data access).
- """
-
- def __init__(self, j_execution_environment):
- self._j_execution_environment = j_execution_environment
-
- def get_parallelism(self) -> int:
- """
- Gets the parallelism with which operation are executed by default.
-
- :return: The parallelism.
- """
- return self._j_execution_environment.getParallelism()
-
- def set_parallelism(self, parallelism: int):
- """
- Sets the parallelism for operations executed through this environment.
- Setting a parallelism of x here will cause all operators to run with
- x parallel instances.
-
- :param parallelism: The parallelism.
- """
- self._j_execution_environment.setParallelism(parallelism)
-
- def get_default_local_parallelism(self) -> int:
- """
- Gets the default parallelism that will be used for the local execution environment.
-
- :return: The parallelism.
- """
- return self._j_execution_environment.getDefaultLocalParallelism()
-
- def set_default_local_parallelism(self, parallelism: int):
- """
- Sets the default parallelism that will be used for the local execution environment.
-
- :param parallelism: The parallelism.
- """
- self._j_execution_environment.setDefaultLocalParallelism(parallelism)
-
- def get_config(self) -> ExecutionConfig:
- """
- Gets the config object that defines execution parameters.
-
- :return: An :class:`ExecutionConfig` object, the environment's execution configuration.
- """
- return ExecutionConfig(self._j_execution_environment.getConfig())
-
- def set_restart_strategy(self, restart_strategy_configuration: RestartStrategyConfiguration):
- """
- Sets the restart strategy configuration. The configuration specifies which restart strategy
- will be used for the execution graph in case of a restart.
-
- Example:
- ::
-
- >>> env.set_restart_strategy(RestartStrategies.no_restart())
-
- :param restart_strategy_configuration: Restart strategy configuration to be set.
- """
- self._j_execution_environment.setRestartStrategy(
- restart_strategy_configuration._j_restart_strategy_configuration)
-
- def get_restart_strategy(self) -> RestartStrategyConfiguration:
- """
- Returns the specified restart strategy configuration.
-
- :return: The restart strategy configuration to be used.
- """
- return RestartStrategies._from_j_restart_strategy(
- self._j_execution_environment.getRestartStrategy())
-
- def add_default_kryo_serializer(self, type_class_name: str, serializer_class_name: str):
- """
- Adds a new Kryo default serializer to the Runtime.
-
- Example:
- ::
-
- >>> env.add_default_kryo_serializer("com.aaa.bbb.TypeClass", "com.aaa.bbb.Serializer")
-
- :param type_class_name: The full-qualified java class name of the types serialized with the
- given serializer.
- :param serializer_class_name: The full-qualified java class name of the serializer to use.
- """
- type_clz = load_java_class(type_class_name)
- j_serializer_clz = load_java_class(serializer_class_name)
- self._j_execution_environment.addDefaultKryoSerializer(type_clz, j_serializer_clz)
-
- def register_type_with_kryo_serializer(self, type_class_name: str, serializer_class_name: str):
- """
- Registers the given Serializer via its class as a serializer for the given type at the
- KryoSerializer.
-
- Example:
- ::
-
- >>> env.register_type_with_kryo_serializer("com.aaa.bbb.TypeClass",
- ... "com.aaa.bbb.Serializer")
-
- :param type_class_name: The full-qualified java class name of the types serialized with
- the given serializer.
- :param serializer_class_name: The full-qualified java class name of the serializer to use.
- """
- type_clz = load_java_class(type_class_name)
- j_serializer_clz = load_java_class(serializer_class_name)
- self._j_execution_environment.registerTypeWithKryoSerializer(type_clz, j_serializer_clz)
-
- def register_type(self, type_class_name: str):
- """
- Registers the given type with the serialization stack. If the type is eventually
- serialized as a POJO, then the type is registered with the POJO serializer. If the
- type ends up being serialized with Kryo, then it will be registered at Kryo to make
- sure that only tags are written.
-
- Example:
- ::
-
- >>> env.register_type("com.aaa.bbb.TypeClass")
-
- :param type_class_name: The full-qualified java class name of the type to register.
- """
- type_clz = load_java_class(type_class_name)
- self._j_execution_environment.registerType(type_clz)
-
- def execute(self, job_name: str = None) -> JobExecutionResult:
- """
- Triggers the program execution. The environment will execute all parts of the program that
- have resulted in a "sink" operation.
-
- The program execution will be logged and displayed with the given job name.
-
- :param job_name: Desired name of the job, optional.
- :return: The result of the job execution, containing elapsed time and accumulators.
- """
- if job_name is None:
- return JobExecutionResult(self._j_execution_environment.execute())
- else:
- return JobExecutionResult(self._j_execution_environment.execute(job_name))
-
- def get_execution_plan(self) -> str:
- """
- Creates the plan with which the system will execute the program, and returns it as
- a String using a JSON representation of the execution data flow graph.
- Note that this needs to be called, before the plan is executed.
-
- If the compiler could not be instantiated, or the master could not
- be contacted to retrieve information relevant to the execution planning,
- an exception will be thrown.
-
- :return: The execution plan of the program, as a JSON String.
- """
- return self._j_execution_environment.getExecutionPlan()
-
- @staticmethod
- def get_execution_environment() -> 'ExecutionEnvironment':
- """
- Creates an execution environment that represents the context in which the program is
- currently executed. If the program is invoked standalone, this method returns a local
- execution environment. If the program is invoked from within the command line client to be
- submitted to a cluster, this method returns the execution environment of this cluster.
-
- :return: The :class:`ExecutionEnvironment` of the context in which the program is executed.
- """
- gateway = get_gateway()
- j_execution_environment = gateway.jvm.org.apache.flink.api.java.ExecutionEnvironment\
- .getExecutionEnvironment()
- return ExecutionEnvironment(j_execution_environment)
diff --git a/flink-python/pyflink/dataset/tests/__init__.py b/flink-python/pyflink/dataset/tests/__init__.py
deleted file mode 100644
index 65b48d4..0000000
--- a/flink-python/pyflink/dataset/tests/__init__.py
+++ /dev/null
@@ -1,17 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
diff --git a/flink-python/pyflink/dataset/tests/test_execution_environment.py b/flink-python/pyflink/dataset/tests/test_execution_environment.py
deleted file mode 100644
index 49ed8c9..0000000
--- a/flink-python/pyflink/dataset/tests/test_execution_environment.py
+++ /dev/null
@@ -1,137 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-import json
-import os
-import tempfile
-import time
-
-import unittest
-
-from pyflink.common import ExecutionConfig, RestartStrategies
-from pyflink.dataset import ExecutionEnvironment
-from pyflink.table import DataTypes, BatchTableEnvironment, CsvTableSource, CsvTableSink
-from pyflink.testing.test_case_utils import PyFlinkTestCase, exec_insert_table
-
-
-class ExecutionEnvironmentTests(PyFlinkTestCase):
-
- def setUp(self):
- self.env = ExecutionEnvironment.get_execution_environment()
-
- def test_get_set_parallelism(self):
-
- self.env.set_parallelism(10)
-
- parallelism = self.env.get_parallelism()
-
- self.assertEqual(parallelism, 10)
-
- def test_get_set_default_local_parallelism(self):
-
- self.env.set_default_local_parallelism(8)
-
- parallelism = self.env.get_default_local_parallelism()
-
- self.assertEqual(parallelism, 8)
-
- def test_get_config(self):
-
- execution_config = self.env.get_config()
-
- self.assertIsInstance(execution_config, ExecutionConfig)
-
- def test_set_get_restart_strategy(self):
-
- self.env.set_restart_strategy(RestartStrategies.no_restart())
-
- restart_strategy = self.env.get_restart_strategy()
-
- self.assertEqual(restart_strategy, RestartStrategies.no_restart())
-
- def test_add_default_kryo_serializer(self):
-
- self.env.add_default_kryo_serializer(
- "org.apache.flink.runtime.state.StateBackendTestBase$TestPojo",
- "org.apache.flink.runtime.state.StateBackendTestBase$CustomKryoTestSerializer")
-
- class_dict = self.env.get_config().get_default_kryo_serializer_classes()
-
- self.assertEqual(class_dict,
- {'org.apache.flink.runtime.state.StateBackendTestBase$TestPojo':
- 'org.apache.flink.runtime.state'
- '.StateBackendTestBase$CustomKryoTestSerializer'})
-
- def test_register_type_with_kryo_serializer(self):
-
- self.env.register_type_with_kryo_serializer(
- "org.apache.flink.runtime.state.StateBackendTestBase$TestPojo",
- "org.apache.flink.runtime.state.StateBackendTestBase$CustomKryoTestSerializer")
-
- class_dict = self.env.get_config().get_registered_types_with_kryo_serializer_classes()
-
- self.assertEqual(class_dict,
- {'org.apache.flink.runtime.state.StateBackendTestBase$TestPojo':
- 'org.apache.flink.runtime.state'
- '.StateBackendTestBase$CustomKryoTestSerializer'})
-
- def test_register_type(self):
-
- self.env.register_type("org.apache.flink.runtime.state.StateBackendTestBase$TestPojo")
-
- type_list = self.env.get_config().get_registered_pojo_types()
-
- self.assertEqual(type_list,
- ["org.apache.flink.runtime.state.StateBackendTestBase$TestPojo"])
-
- @unittest.skip("Python API does not support DataSet now. refactor this test later")
- def test_get_execution_plan(self):
- tmp_dir = tempfile.gettempdir()
- source_path = os.path.join(tmp_dir + '/streaming.csv')
- tmp_csv = os.path.join(tmp_dir + '/streaming2.csv')
- field_names = ["a", "b", "c"]
- field_types = [DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING()]
-
- t_env = BatchTableEnvironment.create(self.env)
- csv_source = CsvTableSource(source_path, field_names, field_types)
- t_env.register_table_source("Orders", csv_source)
- t_env.register_table_sink(
- "Results",
- CsvTableSink(field_names, field_types, tmp_csv))
- t_env.from_path("Orders").execute_insert("Results").wait()
-
- plan = self.env.get_execution_plan()
-
- json.loads(plan)
-
- def test_execute(self):
- tmp_dir = tempfile.gettempdir()
- field_names = ['a', 'b', 'c']
- field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
- t_env = BatchTableEnvironment.create(self.env)
- t_env.register_table_sink(
- 'Results',
- CsvTableSink(field_names, field_types,
- os.path.join('{}/{}.csv'.format(tmp_dir, round(time.time())))))
- execution_result = exec_insert_table(
- t_env.from_elements([(1, 'Hi', 'Hello')], ['a', 'b', 'c']),
- 'Results')
- self.assertIsNotNone(execution_result.get_job_id())
- self.assertIsNotNone(execution_result.get_net_runtime())
- self.assertEqual(len(execution_result.get_all_accumulator_results()), 0)
- self.assertIsNone(execution_result.get_accumulator_result('accumulator'))
- self.assertIsNotNone(str(execution_result))
diff --git a/flink-python/pyflink/dataset/tests/test_execution_environment_completeness.py b/flink-python/pyflink/dataset/tests/test_execution_environment_completeness.py
deleted file mode 100644
index 2d49844..0000000
--- a/flink-python/pyflink/dataset/tests/test_execution_environment_completeness.py
+++ /dev/null
@@ -1,64 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-from pyflink.dataset import ExecutionEnvironment
-from pyflink.testing.test_case_utils import PythonAPICompletenessTestCase, PyFlinkTestCase
-
-
-class ExecutionEnvironmentCompletenessTests(PythonAPICompletenessTestCase,
- PyFlinkTestCase):
-
- @classmethod
- def python_class(cls):
- return ExecutionEnvironment
-
- @classmethod
- def java_class(cls):
- return "org.apache.flink.api.java.ExecutionEnvironment"
-
- @classmethod
- def excluded_methods(cls):
- # Exclude these methods for the time being, because current
- # ExecutionEnvironment/StreamExecutionEnvironment do not apply to the
- # DataSet/DataStream API, but to the Table API configuration.
- # Currently only the methods for configuration is added.
- # 'setSessionTimeout', 'getSessionTimeout', 'setNumberOfExecutionRetries',
- # 'getNumberOfExecutionRetries' is deprecated, exclude them.
- # 'access$000' is generated by java compiler, exclude it too.
- return {'resetContextEnvironment', 'getSessionTimeout', 'fromParallelCollection',
- 'getId', 'registerCachedFile', 'setNumberOfExecutionRetries', 'readTextFile',
- 'getNumberOfExecutionRetries', 'registerCachedFilesWithPlan',
- 'getLastJobExecutionResult', 'readCsvFile', 'initializeContextEnvironment',
- 'createLocalEnvironment', 'createLocalEnvironmentWithWebUI', 'createProgramPlan',
- 'getIdString', 'setSessionTimeout', 'fromElements', 'createRemoteEnvironment',
- 'startNewSession', 'fromCollection', 'readTextFileWithValue', 'registerDataSink',
- 'createCollectionsEnvironment', 'readFile', 'readFileOfPrimitives',
- 'generateSequence', 'areExplicitEnvironmentsAllowed', 'createInput',
- 'getUserCodeClassLoader', 'getExecutorServiceLoader', 'getConfiguration',
- 'executeAsync', 'registerJobListener', 'clearJobListeners', 'configure'}
-
-
-if __name__ == '__main__':
- import unittest
-
- try:
- import xmlrunner
- testRunner = xmlrunner.XMLTestRunner(output='target/test-reports')
- except ImportError:
- testRunner = None
- unittest.main(testRunner=testRunner, verbosity=2)
diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
index 890df9c..27d7c37 100644
--- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
+++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
@@ -34,8 +34,8 @@ class StreamExecutionEnvironmentCompletenessTests(PythonAPICompletenessTestCase,
@classmethod
def excluded_methods(cls):
# Exclude these methods for the time being, because current
- # ExecutionEnvironment/StreamExecutionEnvironment do not apply to the
- # DataSet/DataStream API, but to the Table API configuration.
+ # StreamExecutionEnvironment do not apply to the
+ # DataStream API, but to the Table API configuration.
# Currently only the methods for configuration is added.
# 'isForceCheckpointing', 'getNumberOfExecutionRetries', 'setNumberOfExecutionRetries'
# is deprecated, exclude them.
diff --git a/flink-python/pyflink/shell.py b/flink-python/pyflink/shell.py
index b5c9dc0..841991e 100644
--- a/flink-python/pyflink/shell.py
+++ b/flink-python/pyflink/shell.py
@@ -20,7 +20,6 @@ import platform
import sys
from pyflink.common import *
-from pyflink.dataset import *
from pyflink.datastream import *
from pyflink.table import *
from pyflink.table.catalog import *
@@ -73,36 +72,6 @@ welcome_msg = u'''
NOTE: Use the prebound Table Environment to implement batch or streaming Table programs.
- Batch - Use 'b_env' and 'bt_env' variables
-
- *
- * import tempfile
- * import os
- * import shutil
- * sink_path = tempfile.gettempdir() + '/batch.csv'
- * if os.path.exists(sink_path):
- * if os.path.isfile(sink_path):
- * os.remove(sink_path)
- * else:
- * shutil.rmtree(sink_path)
- * b_env.set_parallelism(1)
- * t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
- * bt_env.connect(FileSystem().path(sink_path)) \\
- * .with_format(OldCsv()
- * .field_delimiter(',')
- * .field("a", DataTypes.BIGINT())
- * .field("b", DataTypes.STRING())
- * .field("c", DataTypes.STRING())) \\
- * .with_schema(Schema()
- * .field("a", DataTypes.BIGINT())
- * .field("b", DataTypes.STRING())
- * .field("c", DataTypes.STRING())) \\
- * .create_temporary_table("batch_sink")
- *
- * t.select("a + 1, b, c").insert_into("batch_sink")
- *
- * bt_env.execute("batch_job")
-
Streaming - Use 's_env' and 'st_env' variables
*
@@ -135,10 +104,6 @@ NOTE: Use the prebound Table Environment to implement batch or streaming Table p
'''
utf8_out.write(welcome_msg)
-b_env = ExecutionEnvironment.get_execution_environment()
-
-bt_env = BatchTableEnvironment.create(b_env)
-
s_env = StreamExecutionEnvironment.get_execution_environment()
st_env = StreamTableEnvironment.create(s_env)
diff --git a/flink-python/pyflink/table/__init__.py b/flink-python/pyflink/table/__init__.py
index 75436c9..1cf4313 100644
--- a/flink-python/pyflink/table/__init__.py
+++ b/flink-python/pyflink/table/__init__.py
@@ -81,8 +81,7 @@ from pyflink.table.statement_set import StatementSet
from pyflink.table.table import GroupWindowedTable, GroupedTable, OverWindowedTable, Table, \
WindowGroupedTable
from pyflink.table.table_config import TableConfig
-from pyflink.table.table_environment import (TableEnvironment, StreamTableEnvironment,
- BatchTableEnvironment)
+from pyflink.table.table_environment import (TableEnvironment, StreamTableEnvironment)
from pyflink.table.table_result import TableResult
from pyflink.table.table_schema import TableSchema
from pyflink.table.types import DataTypes, UserDefinedType, Row, RowKind
@@ -91,7 +90,6 @@ from pyflink.table.udf import FunctionContext, ScalarFunction, TableFunction, Ag
__all__ = [
'AggregateFunction',
- 'BatchTableEnvironment',
'CsvTableSink',
'CsvTableSource',
'DataTypes',
diff --git a/flink-python/pyflink/table/examples/batch/word_count.py b/flink-python/pyflink/table/examples/batch/word_count.py
index ec49fb4..2c0d930 100644
--- a/flink-python/pyflink/table/examples/batch/word_count.py
+++ b/flink-python/pyflink/table/examples/batch/word_count.py
@@ -21,7 +21,7 @@ import shutil
import sys
import tempfile
-from pyflink.table import BatchTableEnvironment, EnvironmentSettings
+from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table import expressions as expr
@@ -35,7 +35,7 @@ def word_count():
"with the License"
env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
- t_env = BatchTableEnvironment.create(environment_settings=env_settings)
+ t_env = TableEnvironment.create(environment_settings=env_settings)
# register Results table in table environment
tmp_dir = tempfile.gettempdir()
@@ -67,9 +67,7 @@ def word_count():
table = t_env.from_elements(elements, ["word", "count"])
table.group_by(table.word) \
.select(table.word, expr.lit(1).count.alias('count')) \
- .insert_into("Results")
-
- t_env.execute("word_count")
+ .execute_insert("Results")
if __name__ == '__main__':
diff --git a/flink-python/pyflink/table/table.py b/flink-python/pyflink/table/table.py
index ccb3aa4..9e08278 100644
--- a/flink-python/pyflink/table/table.py
+++ b/flink-python/pyflink/table/table.py
@@ -44,7 +44,7 @@ class Table(object):
"""
A :class:`~pyflink.table.Table` is the core component of the Table API.
- Similar to how the batch and streaming APIs have DataSet and DataStream,
+ Similar to how the DataStream API has DataStream,
the Table API is built around :class:`~pyflink.table.Table`.
Use the methods of :class:`~pyflink.table.Table` to transform data.
diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index 2114dad..82d4557 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -30,13 +30,12 @@ from pyflink.common.typeinfo import TypeInformation
from pyflink.datastream.data_stream import DataStream
from pyflink.common import JobExecutionResult
-from pyflink.dataset import ExecutionEnvironment
from pyflink.java_gateway import get_gateway
from pyflink.serializers import BatchedSerializer, PickleSerializer
from pyflink.table import Table, EnvironmentSettings, Expression, ExplainDetail, \
Module, ModuleEntry, TableSink
from pyflink.table.catalog import Catalog
-from pyflink.table.descriptors import StreamTableDescriptor, BatchTableDescriptor, \
+from pyflink.table.descriptors import StreamTableDescriptor, \
ConnectorDescriptor, ConnectTableDescriptor
from pyflink.table.serializers import ArrowSerializer
from pyflink.table.statement_set import StatementSet
@@ -46,14 +45,13 @@ from pyflink.table.types import _to_java_type, _create_type_verifier, RowType, D
_infer_schema_from_data, _create_converter, from_arrow_type, RowField, create_arrow_schema, \
_to_java_data_type
from pyflink.table.udf import UserDefinedFunctionWrapper, AggregateFunction, udaf, \
- UserDefinedAggregateFunctionWrapper, udtaf, TableAggregateFunction
+ udtaf, TableAggregateFunction
from pyflink.table.utils import to_expression_jarray
from pyflink.util import java_utils
from pyflink.util.java_utils import get_j_env_configuration, is_local_deployment, load_java_class, \
to_j_explain_detail_arr, to_jarray
__all__ = [
- 'BatchTableEnvironment',
'StreamTableEnvironment',
'TableEnvironment'
]
@@ -93,7 +91,6 @@ class TableEnvironment(object):
def __init__(self, j_tenv, serializer=PickleSerializer()):
self._j_tenv = j_tenv
- self._is_blink_planner = TableEnvironment._judge_blink_planner(j_tenv)
self._serializer = serializer
# When running in MiniCluster, launch the Python UDF worker using the Python executable
# specified by sys.executable if users have not specified it explicitly via configuration
@@ -115,16 +112,6 @@ class TableEnvironment(object):
environment_settings._j_environment_settings)
return TableEnvironment(j_tenv)
- @staticmethod
- def _judge_blink_planner(j_tenv):
- if "getPlanner" not in dir(j_tenv):
- return False
- else:
- j_planner_class = j_tenv.getPlanner().getClass()
- j_blink_planner_class = get_java_class(
- get_gateway().jvm.org.apache.flink.table.planner.delegation.PlannerBase)
- return j_blink_planner_class.isAssignableFrom(j_planner_class)
-
def from_table_source(self, table_source: 'TableSource') -> 'Table':
"""
Creates a table from a table source.
@@ -1082,8 +1069,7 @@ class TableEnvironment(object):
.loadClass(function_class_name).newInstance()
# this is a temporary solution and will be unified later when we use the new type
# system(DataType) to replace the old type system(TypeInformation).
- if (self._is_blink_planner and not isinstance(self, StreamTableEnvironment)) or \
- self.__class__ == TableEnvironment:
+ if not isinstance(self, StreamTableEnvironment) or self.__class__ == TableEnvironment:
if self._is_table_function(java_function):
self._register_table_function(name, java_function)
elif self._is_aggregate_function(java_function):
@@ -1128,8 +1114,7 @@ class TableEnvironment(object):
java_function = function._java_user_defined_function()
# this is a temporary solution and will be unified later when we use the new type
# system(DataType) to replace the old type system(TypeInformation).
- if (self._is_blink_planner and isinstance(self, BatchTableEnvironment)) or \
- self.__class__ == TableEnvironment:
+ if self.__class__ == TableEnvironment:
if self._is_table_function(java_function):
self._register_table_function(name, java_function)
elif self._is_aggregate_function(java_function):
@@ -1442,14 +1427,10 @@ class TableEnvironment(object):
execution_config = self._get_j_env().getConfig()
gateway = get_gateway()
j_objs = gateway.jvm.PythonBridgeUtils.readPythonObjects(temp_file.name, True)
- if self._is_blink_planner:
- PythonTableUtils = gateway.jvm \
- .org.apache.flink.table.planner.utils.python.PythonTableUtils
- PythonInputFormatTableSource = gateway.jvm \
- .org.apache.flink.table.planner.utils.python.PythonInputFormatTableSource
- else:
- PythonTableUtils = gateway.jvm.PythonTableUtils
- PythonInputFormatTableSource = gateway.jvm.PythonInputFormatTableSource
+ PythonTableUtils = gateway.jvm \
+ .org.apache.flink.table.planner.utils.python.PythonTableUtils
+ PythonInputFormatTableSource = gateway.jvm \
+ .org.apache.flink.table.planner.utils.python.PythonInputFormatTableSource
j_input_format = PythonTableUtils.getInputFormat(
j_objs, row_type_info, execution_config)
j_table_source = PythonInputFormatTableSource(
@@ -1489,10 +1470,6 @@ class TableEnvironment(object):
.. versionadded:: 1.11.0
"""
- if not self._is_blink_planner and isinstance(self, BatchTableEnvironment):
- raise TypeError("It doesn't support to convert from Pandas DataFrame in the batch "
- "mode of old planner")
-
import pandas as pd
if not isinstance(pdf, pd.DataFrame):
raise TypeError("Unsupported type, expected pandas.DataFrame, got %s" % type(pdf))
@@ -1535,9 +1512,8 @@ class TableEnvironment(object):
data_type = jvm.org.apache.flink.table.types.utils.TypeConversions\
.fromLegacyInfoToDataType(_to_java_type(result_type)).notNull()
- if self._is_blink_planner:
- data_type = data_type.bridgedTo(
- load_java_class('org.apache.flink.table.data.RowData'))
+ data_type = data_type.bridgedTo(
+ load_java_class('org.apache.flink.table.data.RowData'))
j_arrow_table_source = \
jvm.org.apache.flink.table.runtime.arrow.ArrowUtils.createArrowTableSource(
@@ -1566,13 +1542,7 @@ class TableEnvironment(object):
j_configuration.setString(config_key, ";".join(jar_urls_set))
def _get_j_env(self):
- if self._is_blink_planner:
- return self._j_tenv.getPlanner().getExecEnv()
- else:
- try:
- return self._j_tenv.execEnv()
- except:
- return self._j_tenv.getPlanner().getExecutionEnvironment()
+ return self._j_tenv.getPlanner().getExecEnv()
@staticmethod
def _is_table_function(java_function):
@@ -1618,10 +1588,6 @@ class TableEnvironment(object):
self._add_jars_to_j_env_config(classpaths_key)
def _wrap_aggregate_function_if_needed(self, function) -> UserDefinedFunctionWrapper:
- if isinstance(function, (AggregateFunction, TableAggregateFunction,
- UserDefinedAggregateFunctionWrapper)):
- if not self._is_blink_planner:
- raise Exception("Python UDAF and UDTAF are only supported in blink planner")
if isinstance(function, AggregateFunction):
function = udaf(function,
result_type=function.get_result_type(),
@@ -1794,147 +1760,3 @@ class StreamTableEnvironment(TableEnvironment):
"""
j_data_stream = self._j_tenv.toRetractStream(table._j_table, type_info.get_java_type_info())
return DataStream(j_data_stream=j_data_stream)
-
-
-class BatchTableEnvironment(TableEnvironment):
- """
- .. note:: BatchTableEnvironment will be dropped in Flink 1.14 because it only supports the old
- planner. Use the unified :class:`~pyflink.table.TableEnvironment` instead, which
- supports both batch and streaming. More advanced operations previously covered by
- the DataSet API can now use the DataStream API in BATCH execution mode.
- """
-
- def __init__(self, j_tenv):
- super(BatchTableEnvironment, self).__init__(j_tenv)
- self._j_tenv = j_tenv
-
- def connect(self, connector_descriptor: ConnectorDescriptor) -> \
- Union[BatchTableDescriptor, StreamTableDescriptor]:
- """
- Creates a temporary table from a descriptor.
-
- Descriptors allow for declaring the communication to external systems in an
- implementation-agnostic way. The classpath is scanned for suitable table factories that
- match the desired configuration.
-
- The following example shows how to read from a connector using a JSON format and
- registering a temporary table as "MyTable":
- ::
-
- >>> table_env \\
- ... .connect(ExternalSystemXYZ()
- ... .version("0.11")) \\
- ... .with_format(Json()
- ... .json_schema("{...}")
- ... .fail_on_missing_field(False)) \\
- ... .with_schema(Schema()
- ... .field("user-name", "VARCHAR")
- ... .from_origin_field("u_name")
- ... .field("count", "DECIMAL")) \\
- ... .create_temporary_table("MyTable")
-
- :param connector_descriptor: Connector descriptor describing the external system.
- :return: A :class:`~pyflink.table.descriptors.BatchTableDescriptor` or a
- :class:`~pyflink.table.descriptors.StreamTableDescriptor` (for blink planner) used
- to build the temporary table.
-
- .. note:: Deprecated in 1.11. Use :func:`execute_sql` to register a table instead.
- """
- warnings.warn("Deprecated in 1.11. Use execute_sql instead.", DeprecationWarning)
- gateway = get_gateway()
- blink_t_env_class = get_java_class(
- gateway.jvm.org.apache.flink.table.api.internal.TableEnvironmentImpl)
- if blink_t_env_class == self._j_tenv.getClass():
- return StreamTableDescriptor(
- self._j_tenv.connect(connector_descriptor._j_connector_descriptor))
- else:
- return BatchTableDescriptor(
- self._j_tenv.connect(connector_descriptor._j_connector_descriptor))
-
- @staticmethod
- def create(execution_environment: ExecutionEnvironment = None, # type: ignore
- table_config: TableConfig = None,
- environment_settings: EnvironmentSettings = None) -> 'BatchTableEnvironment':
- """
- Creates a :class:`~pyflink.table.BatchTableEnvironment`.
-
- Example:
- ::
-
- # create with ExecutionEnvironment.
- >>> env = ExecutionEnvironment.get_execution_environment()
- >>> table_env = BatchTableEnvironment.create(env)
- # create with ExecutionEnvironment and TableConfig.
- >>> table_config = TableConfig()
- >>> table_config.set_null_check(False)
- >>> table_env = BatchTableEnvironment.create(env, table_config)
- # create with EnvironmentSettings.
- >>> environment_settings = EnvironmentSettings.new_instance().in_batch_mode() \\
- ... .use_blink_planner().build()
- >>> table_env = BatchTableEnvironment.create(environment_settings=environment_settings)
-
- :param execution_environment: The batch :class:`~pyflink.dataset.ExecutionEnvironment` of
- the TableEnvironment.
- :param table_config: The configuration of the TableEnvironment, optional.
- :param environment_settings: The environment settings used to instantiate the
- TableEnvironment. It provides the interfaces about planner
- selection(flink or blink), optional.
- :return: The BatchTableEnvironment created from given ExecutionEnvironment and
- configuration.
-
- .. note:: This part of the API will be dropped in Flink 1.14 because it only supports the
- old planner. Use the unified :class:`~pyflink.table.TableEnvironment` instead, it
- supports both batch and streaming. For more advanced operations, the new batch
- mode of the DataStream API might be useful.
- """
- warnings.warn(
- "Deprecated in 1.14. Use the unified TableEnvironment instead.",
- DeprecationWarning)
- if execution_environment is None and \
- table_config is None and \
- environment_settings is None:
- raise ValueError("No argument found, the param 'execution_environment' "
- "or 'environment_settings' is required.")
- elif execution_environment is None and \
- table_config is not None and \
- environment_settings is None:
- raise ValueError("Only the param 'table_config' is found, "
- "the param 'execution_environment' is also required.")
- elif execution_environment is not None and \
- environment_settings is not None:
- raise ValueError("The param 'execution_environment' and "
- "'environment_settings' cannot be used at the same time")
- elif table_config is not None and \
- environment_settings is not None:
- raise ValueError("The param 'table_config' and "
- "'environment_settings' cannot be used at the same time")
-
- gateway = get_gateway()
- if environment_settings is not None:
- if environment_settings.is_streaming_mode():
- raise ValueError("The environment settings for BatchTableEnvironment must be "
- "set to batch mode.")
- JEnvironmentSettings = get_gateway().jvm.org.apache.flink.table.api.EnvironmentSettings
-
- old_planner_class_name = EnvironmentSettings.new_instance().in_batch_mode() \
- .use_old_planner().build()._j_environment_settings \
- .toPlannerProperties()[JEnvironmentSettings.CLASS_NAME]
- planner_properties = environment_settings._j_environment_settings.toPlannerProperties()
- if JEnvironmentSettings.CLASS_NAME in planner_properties and \
- planner_properties[JEnvironmentSettings.CLASS_NAME] == old_planner_class_name:
- # The Java EnvironmentSettings API does not support creating table environment with
- # old planner. Create it from other API.
- j_tenv = gateway.jvm.BatchTableEnvironment.create(
- ExecutionEnvironment.get_execution_environment()._j_execution_environment)
- else:
- j_tenv = gateway.jvm.TableEnvironment.create(
- environment_settings._j_environment_settings)
- else:
- if table_config is None:
- j_tenv = gateway.jvm.BatchTableEnvironment.create(
- execution_environment._j_execution_environment)
- else:
- j_tenv = gateway.jvm.BatchTableEnvironment.create(
- execution_environment._j_execution_environment,
- table_config._j_table_config)
- return BatchTableEnvironment(j_tenv)
diff --git a/flink-python/pyflink/table/tests/test_calc.py b/flink-python/pyflink/table/tests/test_calc.py
index c51974f..f09b1fa 100644
--- a/flink-python/pyflink/table/tests/test_calc.py
+++ b/flink-python/pyflink/table/tests/test_calc.py
@@ -21,7 +21,7 @@ import datetime
from decimal import Decimal
from pyflink.common import Row
-from pyflink.table import DataTypes, BatchTableEnvironment, EnvironmentSettings
+from pyflink.table import DataTypes
from pyflink.table.expressions import row
from pyflink.table.tests.test_types import PythonOnlyPoint, PythonOnlyUDT
from pyflink.testing import source_sink_utils
@@ -122,50 +122,6 @@ class StreamTableCalcTests(PyFlinkBlinkStreamTableTestCase):
expected = ['+I[1, abc, 2.0]', '+I[2, def, 3.0]']
self.assert_equals(actual, expected)
- def test_blink_from_element(self):
- t_env = BatchTableEnvironment.create(environment_settings=EnvironmentSettings
- .new_instance().use_blink_planner()
- .in_batch_mode().build())
- field_names = ["a", "b", "c", "d", "e", "f", "g", "h",
- "i", "j", "k", "l", "m", "n", "o", "p", "q"]
- field_types = [DataTypes.BIGINT(), DataTypes.DOUBLE(), DataTypes.STRING(),
- DataTypes.STRING(), DataTypes.DATE(),
- DataTypes.TIME(),
- DataTypes.TIMESTAMP(3),
- DataTypes.INTERVAL(DataTypes.SECOND(3)),
- DataTypes.ARRAY(DataTypes.DOUBLE()),
- DataTypes.ARRAY(DataTypes.DOUBLE(False)),
- DataTypes.ARRAY(DataTypes.STRING()),
- DataTypes.ARRAY(DataTypes.DATE()),
- DataTypes.DECIMAL(38, 18),
- DataTypes.ROW([DataTypes.FIELD("a", DataTypes.BIGINT()),
- DataTypes.FIELD("b", DataTypes.DOUBLE())]),
- DataTypes.MAP(DataTypes.STRING(), DataTypes.DOUBLE()),
- DataTypes.BYTES(),
- PythonOnlyUDT()]
- schema = DataTypes.ROW(
- list(map(lambda field_name, field_type: DataTypes.FIELD(field_name, field_type),
- field_names,
- field_types)))
- table_sink = source_sink_utils.TestAppendSink(field_names, field_types)
- t_env.register_table_sink("Results", table_sink)
- t = t_env.from_elements(
- [(1, 1.0, "hi", "hello", datetime.date(1970, 1, 2), datetime.time(1, 0, 0),
- datetime.datetime(1970, 1, 2, 0, 0),
- datetime.timedelta(days=1, microseconds=10),
- [1.0, None], array.array("d", [1.0, 2.0]),
- ["abc"], [datetime.date(1970, 1, 2)], Decimal(1), Row("a", "b")(1, 2.0),
- {"key": 1.0}, bytearray(b'ABCD'),
- PythonOnlyPoint(3.0, 4.0))],
- schema)
- t.execute_insert("Results").wait()
- actual = source_sink_utils.results()
-
- expected = ['+I[1, 1.0, hi, hello, 1970-01-02, 01:00:00, 1970-01-02 00:00:00.0, '
- '86400000, [1.0, null], [1.0, 2.0], [abc], [1970-01-02], '
- '1.000000000000000000, +I[1, 2.0], {key=1.0}, [65, 66, 67, 68], [3.0, 4.0]]']
- self.assert_equals(actual, expected)
-
if __name__ == '__main__':
import unittest
diff --git a/flink-python/pyflink/table/tests/test_dependency.py b/flink-python/pyflink/table/tests/test_dependency.py
index b3a0bca..7567b93 100644
--- a/flink-python/pyflink/table/tests/test_dependency.py
+++ b/flink-python/pyflink/table/tests/test_dependency.py
@@ -27,9 +27,7 @@ from pyflink.table import expressions as expr
from pyflink.table.udf import udf
from pyflink.testing import source_sink_utils
from pyflink.testing.test_case_utils import (PyFlinkBlinkStreamTableTestCase,
- PyFlinkBlinkBatchTableTestCase,
- PyFlinkOldStreamTableTestCase,
- PyFlinkOldBatchTableTestCase)
+ PyFlinkBlinkBatchTableTestCase)
class DependencyTests(object):
@@ -67,35 +65,6 @@ class DependencyTests(object):
self.assert_equals(actual, ["+I[3, 1]", "+I[4, 2]", "+I[5, 3]"])
-class FlinkStreamDependencyTests(DependencyTests, PyFlinkOldStreamTableTestCase):
-
- pass
-
-
-class FlinkBatchDependencyTests(PyFlinkOldBatchTableTestCase):
-
- def test_add_python_file(self):
- python_file_dir = os.path.join(self.tempdir, "python_file_dir_" + str(uuid.uuid4()))
- os.mkdir(python_file_dir)
- python_file_path = os.path.join(python_file_dir, "test_dependency_manage_lib.py")
- with open(python_file_path, 'w') as f:
- f.write("def add_two(a):\n return a + 2")
- self.t_env.add_python_file(python_file_path)
-
- def plus_two(i):
- from test_dependency_manage_lib import add_two
- return add_two(i)
-
- self.t_env.create_temporary_system_function(
- "add_two", udf(plus_two, DataTypes.BIGINT(), DataTypes.BIGINT()))
-
- t = self.t_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
- t = t.select(expr.call('add_two', t.a), t.a)
-
- result = self.collect(t)
- self.assertEqual(result, ["+I[3, 1]", "+I[4, 2]", "+I[5, 3]"])
-
-
class BlinkBatchDependencyTests(DependencyTests, PyFlinkBlinkBatchTableTestCase):
pass
diff --git a/flink-python/pyflink/table/tests/test_descriptor.py b/flink-python/pyflink/table/tests/test_descriptor.py
index 92bada3..e15bf7f 100644
--- a/flink-python/pyflink/table/tests/test_descriptor.py
+++ b/flink-python/pyflink/table/tests/test_descriptor.py
@@ -25,9 +25,7 @@ from pyflink.table.descriptors import (FileSystem, OldCsv, Rowtime, Schema, Kafk
CustomFormatDescriptor)
from pyflink.table.table_schema import TableSchema
from pyflink.table.types import DataTypes
-from pyflink.testing.test_case_utils import (PyFlinkTestCase, PyFlinkOldStreamTableTestCase,
- PyFlinkOldBatchTableTestCase,
- _load_specific_flink_module_jars)
+from pyflink.testing.test_case_utils import (PyFlinkTestCase, _load_specific_flink_module_jars)
class FileSystemDescriptorTests(PyFlinkTestCase):
@@ -1080,58 +1078,6 @@ class AbstractTableDescriptorTests(object):
assert lines == '2,Hi,Hello\n' + "3,Hello,Hello\n"
-class StreamTableDescriptorTests(PyFlinkOldStreamTableTestCase, AbstractTableDescriptorTests):
-
- def test_in_append_mode(self):
- descriptor = self.t_env.connect(FileSystem())
-
- descriptor = descriptor\
- .with_format(OldCsv())\
- .in_append_mode()
-
- properties = descriptor.to_properties()
- expected = {'update-mode': 'append',
- 'format.type': 'csv',
- 'format.property-version': '1',
- 'connector.property-version': '1',
- 'connector.type': 'filesystem'}
- assert properties == expected
-
- def test_in_retract_mode(self):
- descriptor = self.t_env.connect(FileSystem())
-
- descriptor = descriptor \
- .with_format(OldCsv()) \
- .in_retract_mode()
-
- properties = descriptor.to_properties()
- expected = {'update-mode': 'retract',
- 'format.type': 'csv',
- 'format.property-version': '1',
- 'connector.property-version': '1',
- 'connector.type': 'filesystem'}
- assert properties == expected
-
- def test_in_upsert_mode(self):
- descriptor = self.t_env.connect(FileSystem())
-
- descriptor = descriptor \
- .with_format(OldCsv()) \
- .in_upsert_mode()
-
- properties = descriptor.to_properties()
- expected = {'update-mode': 'upsert',
- 'format.type': 'csv',
- 'format.property-version': '1',
- 'connector.property-version': '1',
- 'connector.type': 'filesystem'}
- assert properties == expected
-
-
-class BatchTableDescriptorTests(PyFlinkOldBatchTableTestCase, AbstractTableDescriptorTests):
- pass
-
-
if __name__ == '__main__':
import unittest
diff --git a/flink-python/pyflink/table/tests/test_pandas_conversion.py b/flink-python/pyflink/table/tests/test_pandas_conversion.py
index 4f20e24..c9c0986 100644
--- a/flink-python/pyflink/table/tests/test_pandas_conversion.py
+++ b/flink-python/pyflink/table/tests/test_pandas_conversion.py
@@ -24,7 +24,7 @@ from pyflink.common import Row
from pyflink.table.types import DataTypes
from pyflink.testing import source_sink_utils
from pyflink.testing.test_case_utils import PyFlinkBlinkBatchTableTestCase, \
- PyFlinkBlinkStreamTableTestCase, PyFlinkOldStreamTableTestCase
+ PyFlinkBlinkStreamTableTestCase
class PandasConversionTestBase(object):
@@ -172,11 +172,6 @@ class PandasConversionITTests(PandasConversionTestBase):
self.assertTrue(expected_field == result_field)
-class StreamPandasConversionTests(PandasConversionITTests,
- PyFlinkOldStreamTableTestCase):
- pass
-
-
class BlinkBatchPandasConversionTests(PandasConversionTests,
PandasConversionITTests,
PyFlinkBlinkBatchTableTestCase):
diff --git a/flink-python/pyflink/table/tests/test_pandas_udf.py b/flink-python/pyflink/table/tests/test_pandas_udf.py
index 8d3334d..77061ec 100644
--- a/flink-python/pyflink/table/tests/test_pandas_udf.py
+++ b/flink-python/pyflink/table/tests/test_pandas_udf.py
@@ -24,9 +24,8 @@ from pyflink.table import DataTypes
from pyflink.table.tests.test_udf import SubtractOne
from pyflink.table.udf import udf
from pyflink.testing import source_sink_utils
-from pyflink.testing.test_case_utils import PyFlinkOldStreamTableTestCase, \
- PyFlinkBlinkBatchTableTestCase, PyFlinkBlinkStreamTableTestCase, PyFlinkOldBatchTableTestCase, \
- PyFlinkTestCase
+from pyflink.testing.test_case_utils import PyFlinkBlinkBatchTableTestCase, \
+ PyFlinkBlinkStreamTableTestCase, PyFlinkTestCase
class PandasUDFTests(PyFlinkTestCase):
@@ -307,9 +306,6 @@ class PandasUDFITTests(object):
with self.assertRaisesRegex(Py4JJavaError, expected_regex=msg):
t.select(result_type_not_series(t.a)).to_pandas()
-
-class BlinkPandasUDFITTests(object):
-
def test_data_types_only_supported_in_blink_planner(self):
import pandas as pd
@@ -342,34 +338,12 @@ class BlinkPandasUDFITTests(object):
self.assert_equals(actual, ["+I[1970-01-02T00:00:00.123Z]"])
-class StreamPandasUDFITTests(PandasUDFITTests,
- PyFlinkOldStreamTableTestCase):
- pass
-
-
-class BatchPandasUDFITTests(PyFlinkOldBatchTableTestCase):
-
- def test_basic_functionality(self):
- add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT(), func_type="pandas")
-
- # general Python UDF
- subtract_one = udf(SubtractOne(), result_type=DataTypes.BIGINT())
-
- t = self.t_env.from_elements([(1, 2, 3), (2, 5, 6), (3, 1, 9)], ['a', 'b', 'c'])
- t = t.where(add_one(t.b) <= 3) \
- .select(t.a, t.b + 1, add(t.a + 1, subtract_one(t.c)) + 2, add(add_one(t.a), 1))
- result = self.collect(t)
- self.assert_equals(result, ["+I[1, 3, 6, 3]", "+I[3, 2, 14, 5]"])
-
-
class BlinkBatchPandasUDFITTests(PandasUDFITTests,
- BlinkPandasUDFITTests,
PyFlinkBlinkBatchTableTestCase):
pass
class BlinkStreamPandasUDFITTests(PandasUDFITTests,
- BlinkPandasUDFITTests,
PyFlinkBlinkStreamTableTestCase):
pass
diff --git a/flink-python/pyflink/table/tests/test_set_operation.py b/flink-python/pyflink/table/tests/test_set_operation.py
index 51da302..c25ec59 100644
--- a/flink-python/pyflink/table/tests/test_set_operation.py
+++ b/flink-python/pyflink/table/tests/test_set_operation.py
@@ -16,10 +16,10 @@
# # limitations under the License.
################################################################################
-from pyflink.testing.test_case_utils import PyFlinkOldBatchTableTestCase
+from pyflink.testing.test_case_utils import PyFlinkBlinkBatchTableTestCase
-class StreamTableSetOperationTests(PyFlinkOldBatchTableTestCase):
+class StreamTableSetOperationTests(PyFlinkBlinkBatchTableTestCase):
data1 = [(1, "Hi", "Hello")]
data2 = [(3, "Hello", "Hello")]
diff --git a/flink-python/pyflink/table/tests/test_shell_example.py b/flink-python/pyflink/table/tests/test_shell_example.py
index 03dd197..60dae68 100644
--- a/flink-python/pyflink/table/tests/test_shell_example.py
+++ b/flink-python/pyflink/table/tests/test_shell_example.py
@@ -23,40 +23,6 @@ class ShellExampleTests(PyFlinkTestCase):
If these tests failed, please fix these examples code and copy them to shell.py
"""
- def test_batch_case(self):
- from pyflink.shell import b_env, bt_env, FileSystem, OldCsv, DataTypes, Schema
- # example begin
-
- import tempfile
- import os
- import shutil
- sink_path = tempfile.gettempdir() + '/batch.csv'
- if os.path.exists(sink_path):
- if os.path.isfile(sink_path):
- os.remove(sink_path)
- else:
- shutil.rmtree(sink_path)
- b_env.set_parallelism(1)
- t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
- bt_env.connect(FileSystem().path(sink_path))\
- .with_format(OldCsv()
- .field_delimiter(',')
- .field("a", DataTypes.BIGINT())
- .field("b", DataTypes.STRING())
- .field("c", DataTypes.STRING()))\
- .with_schema(Schema()
- .field("a", DataTypes.BIGINT())
- .field("b", DataTypes.STRING())
- .field("c", DataTypes.STRING()))\
- .create_temporary_table("batch_sink")
-
- t.select("a + 1, b, c").execute_insert("batch_sink").wait()
-
- # verify code, do not copy these code to shell.py
- with open(sink_path, 'r') as f:
- lines = f.read()
- self.assertEqual(lines, '2,hi,hello\n' + '3,hi,hello\n')
-
def test_stream_case(self):
from pyflink.shell import s_env, st_env, FileSystem, OldCsv, DataTypes, Schema
# example begin
diff --git a/flink-python/pyflink/table/tests/test_sort.py b/flink-python/pyflink/table/tests/test_sort.py
index d0b787c..3e78689 100644
--- a/flink-python/pyflink/table/tests/test_sort.py
+++ b/flink-python/pyflink/table/tests/test_sort.py
@@ -16,10 +16,10 @@
# limitations under the License.
################################################################################
-from pyflink.testing.test_case_utils import PyFlinkOldBatchTableTestCase
+from pyflink.testing.test_case_utils import PyFlinkBlinkBatchTableTestCase
-class BatchTableSortTests(PyFlinkOldBatchTableTestCase):
+class BatchTableSortTests(PyFlinkBlinkBatchTableTestCase):
def test_order_by_offset_fetch(self):
t = self.t_env.from_elements([(1, "Hello")], ["a", "b"])
diff --git a/flink-python/pyflink/table/tests/test_sql.py b/flink-python/pyflink/table/tests/test_sql.py
index 5a102ac..27ebc87 100644
--- a/flink-python/pyflink/table/tests/test_sql.py
+++ b/flink-python/pyflink/table/tests/test_sql.py
@@ -24,7 +24,7 @@ from pyflink.java_gateway import get_gateway
from pyflink.table import DataTypes, ResultKind
from pyflink.testing import source_sink_utils
from pyflink.testing.test_case_utils import PyFlinkBlinkStreamTableTestCase, \
- PyFlinkOldBatchTableTestCase, PyFlinkTestCase
+ PyFlinkTestCase
class StreamSqlTests(PyFlinkBlinkStreamTableTestCase):
@@ -110,16 +110,6 @@ class StreamSqlTests(PyFlinkBlinkStreamTableTestCase):
self.assert_equals(actual, expected)
-class BatchSqlTests(PyFlinkOldBatchTableTestCase):
-
- def test_sql_ddl(self):
- self.t_env.execute_sql("create temporary function func1 as "
- "'pyflink.table.tests.test_udf.add' language python")
- table = self.t_env.from_elements([(1, 2)]).alias("a, b").select("func1(a, b)")
- plan = table.explain()
- self.assertTrue(plan.find("DataSetPythonCalc(select=[add(f0, f1) AS _c0])") >= 0)
-
-
class JavaSqlTests(PyFlinkTestCase):
"""
We need to start these Java tests from python process to make sure that Python environment is
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py
index 3774178..27cb89a 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -17,33 +17,25 @@
################################################################################
import datetime
import decimal
-import glob
import os
-import pathlib
import sys
from py4j.protocol import Py4JJavaError
from pyflink.table.udf import udf
from pyflink.common import RowKind
from pyflink.common.typeinfo import Types
-from pyflink.dataset import ExecutionEnvironment
-from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.tests.test_util import DataStreamTestSinkFunction
-from pyflink.find_flink_home import _find_flink_source_root
from pyflink.java_gateway import get_gateway
from pyflink.table import DataTypes, CsvTableSink, StreamTableEnvironment, EnvironmentSettings, \
Module, ResultKind, ModuleEntry
from pyflink.table.descriptors import FileSystem, OldCsv, Schema
from pyflink.table.explain_detail import ExplainDetail
from pyflink.table.expressions import col
-from pyflink.table.table_config import TableConfig
-from pyflink.table.table_environment import BatchTableEnvironment
from pyflink.table.types import RowType, Row
from pyflink.testing import source_sink_utils
-from pyflink.testing.test_case_utils import PyFlinkOldStreamTableTestCase, \
- PyFlinkOldBatchTableTestCase, PyFlinkBlinkBatchTableTestCase, PyFlinkBlinkStreamTableTestCase, \
- PyFlinkLegacyBlinkBatchTableTestCase, PyFlinkLegacyFlinkStreamTableTestCase, \
- PyFlinkLegacyBlinkStreamTableTestCase, _load_specific_flink_module_jars
+from pyflink.testing.test_case_utils import \
+ PyFlinkBlinkBatchTableTestCase, PyFlinkBlinkStreamTableTestCase, \
+ _load_specific_flink_module_jars
from pyflink.util.java_utils import get_j_env_configuration
@@ -201,495 +193,6 @@ class TableEnvironmentTest(object):
self.assert_equals(actual, expected)
-class StreamTableEnvironmentTests(TableEnvironmentTest, PyFlinkOldStreamTableTestCase):
-
- def test_register_table_source_from_path(self):
- t_env = self.t_env
- field_names = ["a", "b", "c"]
- field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
- source_path = os.path.join(self.tempdir + '/streaming.csv')
- csv_source = self.prepare_csv_source(source_path, [], field_types, field_names)
- t_env.register_table_source("Source", csv_source)
-
- result = t_env.from_path("Source")
- self.assertEqual(
- 'CatalogTable: (identifier: [`default_catalog`.`default_database`.`Source`]'
- ', fields: [a, b, c])',
- result._j_table.getQueryOperation().asSummaryString())
-
- def test_register_table_sink(self):
- t_env = self.t_env
- field_names = ["a", "b", "c"]
- field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
- t_env.register_table_sink(
- "Sinks",
- source_sink_utils.TestAppendSink(field_names, field_types))
-
- t_env.from_elements([(1, "Hi", "Hello")], ["a", "b", "c"]).execute_insert("Sinks").wait()
-
- actual = source_sink_utils.results()
-
- expected = ['+I[1, Hi, Hello]']
- self.assert_equals(actual, expected)
-
- def test_from_table_source(self):
- field_names = ["a", "b", "c"]
- field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
- source_path = os.path.join(self.tempdir + '/streaming.csv')
- csv_source = self.prepare_csv_source(source_path, [], field_types, field_names)
-
- result = self.t_env.from_table_source(csv_source)
- self.assertEqual(
- 'TableSource: (fields: [a, b, c])',
- result._j_table.getQueryOperation().asSummaryString())
-
- def test_list_tables(self):
- source_path = os.path.join(self.tempdir + '/streaming.csv')
- field_names = ["a", "b", "c"]
- field_types = [DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING()]
- data = []
- csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
- t_env = self.t_env
- t_env.register_table_source("Orders", csv_source)
- t_env.register_table_sink(
- "Sinks",
- source_sink_utils.TestAppendSink(field_names, field_types))
- t_env.register_table_sink(
- "Results",
- source_sink_utils.TestAppendSink(field_names, field_types))
-
- actual = t_env.list_tables()
-
- expected = ['Orders', 'Results', 'Sinks']
- self.assert_equals(actual, expected)
-
- def test_temporary_views(self):
- t_env = self.t_env
- t_env.create_temporary_view(
- "temporary_view_1",
- t_env.from_elements([(1, 'Hi', 'Hello')], ['a', 'b', 'c']))
- t_env.create_temporary_view(
- "temporary_view_2",
- t_env.from_elements([(1, 'Hi')], ['a', 'b']))
-
- actual = t_env.list_temporary_views()
- expected = ['temporary_view_1', 'temporary_view_2']
- self.assert_equals(actual, expected)
-
- t_env.drop_temporary_view("temporary_view_1")
- actual = t_env.list_temporary_views()
- expected = ['temporary_view_2']
- self.assert_equals(actual, expected)
-
- def test_from_path(self):
- t_env = self.t_env
- t_env.create_temporary_view(
- "temporary_view_1",
- t_env.from_elements([(1, 'Hi', 'Hello')], ['a', 'b', 'c']))
- result = t_env.from_path("temporary_view_1")
- self.assertEqual(
- 'CatalogTable: (identifier: [`default_catalog`.`default_database`.`temporary_view_1`]'
- ', fields: [a, b, c])',
- result._j_table.getQueryOperation().asSummaryString())
-
- def test_insert_into(self):
- t_env = self.t_env
- field_names = ["a", "b", "c"]
- field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
- t_env.register_table_sink(
- "Sinks",
- source_sink_utils.TestAppendSink(field_names, field_types))
-
- t_env.from_elements([(1, "Hi", "Hello")], ["a", "b", "c"]).execute_insert("Sinks").wait()
-
- actual = source_sink_utils.results()
- expected = ['+I[1, Hi, Hello]']
- self.assert_equals(actual, expected)
-
- def test_statement_set(self):
- t_env = self.t_env
- source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
- field_names = ["a", "b", "c"]
- field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
- t_env.register_table_sink(
- "sink1",
- source_sink_utils.TestAppendSink(field_names, field_types))
- t_env.register_table_sink(
- "sink2",
- source_sink_utils.TestAppendSink(field_names, field_types))
-
- stmt_set = t_env.create_statement_set()
-
- stmt_set.add_insert_sql("insert into sink1 select * from %s where a > 100" % source) \
- .add_insert("sink2", source.filter("a < 100"), False)
-
- actual = stmt_set.explain(ExplainDetail.CHANGELOG_MODE)
- assert isinstance(actual, str)
-
- def test_explain_with_multi_sinks(self):
- t_env = self.t_env
- source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
- field_names = ["a", "b", "c"]
- field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
- t_env.register_table_sink(
- "sink1",
- source_sink_utils.TestAppendSink(field_names, field_types))
- t_env.register_table_sink(
- "sink2",
- source_sink_utils.TestAppendSink(field_names, field_types))
-
- stmt_set = t_env.create_statement_set()
- stmt_set.add_insert_sql("insert into sink1 select * from %s where a > 100" % source)
- stmt_set.add_insert_sql("insert into sink2 select * from %s where a < 100" % source)
-
- actual = stmt_set.explain(ExplainDetail.ESTIMATED_COST, ExplainDetail.CHANGELOG_MODE)
- assert isinstance(actual, str)
-
- def test_explain_sql_without_explain_detail(self):
- t_env = self.t_env
- source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
- field_names = ["a", "b", "c"]
- field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
- t_env.register_table_sink(
- "sinks",
- source_sink_utils.TestAppendSink(field_names, field_types))
-
- result = t_env.explain_sql("select a + 1, b, c from %s" % source)
-
- assert isinstance(result, str)
-
- def test_explain_sql_with_explain_detail(self):
- t_env = self.t_env
- source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
- field_names = ["a", "b", "c"]
- field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
- t_env.register_table_sink(
- "sinks",
- source_sink_utils.TestAppendSink(field_names, field_types))
-
- result = t_env.explain_sql(
- "select a + 1, b, c from %s" % source, ExplainDetail.CHANGELOG_MODE)
-
- assert isinstance(result, str)
-
- def test_create_table_environment(self):
- table_config = TableConfig()
- table_config.set_max_generated_code_length(32000)
- table_config.set_null_check(False)
- table_config.set_local_timezone("Asia/Shanghai")
-
- env = StreamExecutionEnvironment.get_execution_environment()
- t_env = StreamTableEnvironment.create(env, table_config)
-
- readed_table_config = t_env.get_config()
-
- self.assertFalse(readed_table_config.get_null_check())
- self.assertEqual(readed_table_config.get_max_generated_code_length(), 32000)
- self.assertEqual(readed_table_config.get_local_timezone(), "Asia/Shanghai")
-
- def test_create_table_environment_with_blink_planner(self):
- t_env = StreamTableEnvironment.create(
- StreamExecutionEnvironment.get_execution_environment(),
- environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
-
- planner = t_env._j_tenv.getPlanner()
-
- self.assertEqual(
- planner.getClass().getName(),
- "org.apache.flink.table.planner.delegation.StreamPlanner")
-
- t_env = StreamTableEnvironment.create(
- environment_settings=EnvironmentSettings.new_instance().build())
-
- planner = t_env._j_tenv.getPlanner()
-
- self.assertEqual(
- planner.getClass().getName(),
- "org.apache.flink.table.planner.delegation.StreamPlanner")
-
- t_env = StreamTableEnvironment.create(
- environment_settings=EnvironmentSettings.new_instance().use_old_planner().build())
-
- planner = t_env._j_tenv.getPlanner()
-
- self.assertEqual(
- planner.getClass().getName(),
- "org.apache.flink.table.planner.StreamPlanner")
-
- def test_table_environment_with_blink_planner(self):
- env = StreamExecutionEnvironment.get_execution_environment()
- env.set_parallelism(1)
- t_env = StreamTableEnvironment.create(
- env,
- environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
-
- source_path = os.path.join(self.tempdir + '/streaming.csv')
- sink_path = os.path.join(self.tempdir + '/result.csv')
- field_names = ["a", "b", "c"]
- field_types = [DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING()]
- data = [(1, 'hi', 'hello'), (2, 'hello', 'hello')]
- csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
-
- t_env.register_table_source("source", csv_source)
-
- t_env.register_table_sink(
- "sink",
- CsvTableSink(field_names, field_types, sink_path))
- source = t_env.from_path("source")
-
- result = source.alias("a, b, c").select("1 + a, b, c")
-
- result.execute_insert("sink").wait()
-
- results = []
- with open(sink_path, 'r') as f:
- results.append(f.readline())
- results.append(f.readline())
-
- self.assert_equals(results, ['2,hi,hello\n', '3,hello,hello\n'])
-
- def test_collect_null_value_result(self):
- element_data = [(1, None, 'a'),
- (3, 4, 'b'),
- (5, None, 'a'),
- (7, 8, 'b')]
- source = self.t_env.from_elements(element_data,
- DataTypes.ROW([DataTypes.FIELD('a', DataTypes.INT()),
- DataTypes.FIELD('b', DataTypes.INT()),
- DataTypes.FIELD('c', DataTypes.STRING())]))
- table_result = source.execute()
- expected_result = [Row(1, None, 'a'), Row(3, 4, 'b'), Row(5, None, 'a'),
- Row(7, 8, 'b')]
- with table_result.collect() as results:
- collected_result = []
- for result in results:
- collected_result.append(result)
- self.assertEqual(collected_result, expected_result)
-
- def test_set_jars(self):
- self.verify_set_java_dependencies("pipeline.jars", self.execute_with_t_env)
-
- def test_set_jars_with_execute_sql(self):
- self.verify_set_java_dependencies("pipeline.jars", self.execute_with_execute_sql)
-
- def test_set_jars_with_statement_set(self):
- self.verify_set_java_dependencies("pipeline.jars", self.execute_with_statement_set)
-
- def test_set_jars_with_table(self):
- self.verify_set_java_dependencies("pipeline.jars", self.execute_with_table)
-
- def test_set_jars_with_table_execute_insert(self):
- self.verify_set_java_dependencies("pipeline.jars", self.execute_with_table_execute_insert)
-
- def test_set_jars_with_table_to_pandas(self):
- self.verify_set_java_dependencies("pipeline.jars", self.execute_with_table_to_pandas)
-
- def test_set_classpaths(self):
- self.verify_set_java_dependencies("pipeline.classpaths", self.execute_with_t_env)
-
- def test_set_classpaths_with_execute_sql(self):
- self.verify_set_java_dependencies("pipeline.classpaths", self.execute_with_execute_sql)
-
- def test_set_classpaths_with_statement_set(self):
- self.verify_set_java_dependencies("pipeline.classpaths", self.execute_with_statement_set)
-
- def test_set_classpaths_with_table(self):
- self.verify_set_java_dependencies("pipeline.classpaths", self.execute_with_table)
-
- def test_set_classpaths_with_table_execute_insert(self):
- self.verify_set_java_dependencies(
- "pipeline.classpaths", self.execute_with_table_execute_insert)
-
- def test_set_classpaths_with_table_to_pandas(self):
- self.verify_set_java_dependencies("pipeline.classpaths", self.execute_with_table_to_pandas)
-
- def execute_with_t_env(self, t_env):
- source = t_env.from_elements([(1, "Hi"), (2, "Hello")], ["a", "b"])
- source.select("func1(a, b), func2(a, b)").execute_insert("sink").wait()
- actual = source_sink_utils.results()
- expected = ['+I[1 and Hi, 1 or Hi]', '+I[2 and Hello, 2 or Hello]']
- self.assert_equals(actual, expected)
-
- @staticmethod
- def execute_with_execute_sql(t_env):
- source = t_env.from_elements([(1, "Hi"), (2, "Hello")], ["a", "b"])
- t_env.create_temporary_view("source", source)
- t_env.execute_sql("select func1(a, b), func2(a, b) from source") \
- .get_job_client() \
- .get_job_execution_result() \
- .result()
-
- def execute_with_statement_set(self, t_env):
- source = t_env.from_elements([(1, "Hi"), (2, "Hello")], ["a", "b"])
- result = source.select("func1(a, b), func2(a, b)")
- t_env.create_statement_set().add_insert("sink", result).execute() \
- .get_job_client() \
- .get_job_execution_result() \
- .result()
- actual = source_sink_utils.results()
- expected = ['+I[1 and Hi, 1 or Hi]', '+I[2 and Hello, 2 or Hello]']
- self.assert_equals(actual, expected)
-
- @staticmethod
- def execute_with_table(t_env):
- source = t_env.from_elements([(1, "Hi"), (2, "Hello")], ["a", "b"])
- result = source.select("func1(a, b), func2(a, b)")
- result.execute() \
- .get_job_client() \
- .get_job_execution_result() \
- .result()
-
- def execute_with_table_execute_insert(self, t_env):
- source = t_env.from_elements([(1, "Hi"), (2, "Hello")], ["a", "b"])
- result = source.select("func1(a, b), func2(a, b)")
- result.execute_insert("sink").wait()
- actual = source_sink_utils.results()
- expected = ['+I[1 and Hi, 1 or Hi]', '+I[2 and Hello, 2 or Hello]']
- expected = ['+I[1 and Hi, 1 or Hi]', '+I[2 and Hello, 2 or Hello]']
- self.assert_equals(actual, expected)
-
- @staticmethod
- def execute_with_table_to_pandas(t_env):
- source = t_env.from_elements([(1, "Hi"), (2, "Hello")], ["a", "b"])
- result = source.select("func1(a, b), func2(a, b)")
- result.to_pandas()
-
- def verify_set_java_dependencies(self, config_key, executor):
- original_class_loader = \
- get_gateway().jvm.Thread.currentThread().getContextClassLoader()
- try:
- jar_urls = []
- func1_class_name = "org.apache.flink.python.util.TestScalarFunction1"
- func2_class_name = "org.apache.flink.python.util.TestScalarFunction2"
- func1_jar_pattern = "flink-python/target/artifacts/testUdf1.jar"
- func2_jar_pattern = "flink-python/target/artifacts/testUdf2.jar"
- self.ensure_jar_not_loaded(func1_class_name, func1_jar_pattern)
- self.ensure_jar_not_loaded(func2_class_name, func2_jar_pattern)
- jar_urls.extend(self.get_jar_url(func1_jar_pattern))
- jar_urls.extend(self.get_jar_url(func2_jar_pattern))
-
- # test set the "pipeline.jars" multiple times
- self.t_env.get_config().get_configuration().set_string(config_key, ";".join(jar_urls))
- first_class_loader = get_gateway().jvm.Thread.currentThread().getContextClassLoader()
-
- self.t_env.get_config().get_configuration().set_string(config_key, jar_urls[0])
- self.t_env.get_config().get_configuration().set_string(config_key, ";".join(jar_urls))
- second_class_loader = get_gateway().jvm.Thread.currentThread().getContextClassLoader()
-
- self.assertEqual(first_class_loader, second_class_loader)
-
- self.t_env.register_java_function("func1", func1_class_name)
- self.t_env.register_java_function("func2", func2_class_name)
- table_sink = source_sink_utils.TestAppendSink(
- ["a", "b"], [DataTypes.STRING(), DataTypes.STRING()])
- self.t_env.register_table_sink("sink", table_sink)
-
- executor(self.t_env)
- finally:
- get_gateway().jvm.Thread.currentThread().setContextClassLoader(original_class_loader)
-
- def ensure_jar_not_loaded(self, func_class_name, jar_filename_pattern):
- test_jars = glob.glob(os.path.join(_find_flink_source_root(), jar_filename_pattern))
- if not test_jars:
- self.fail("'%s' is not available. Please compile the test jars first."
- % jar_filename_pattern)
- try:
- self.t_env.register_java_function("func", func_class_name)
- except Py4JJavaError:
- pass
- else:
- self.fail("The scalar function '%s' should not be able to be loaded. Please remove "
- "the '%s' from the classpath of the PythonGatewayServer process." %
- (func_class_name, jar_filename_pattern))
-
- @staticmethod
- def get_jar_url(jar_filename_pattern):
- test_jars = glob.glob(os.path.join(_find_flink_source_root(), jar_filename_pattern))
- return [pathlib.Path(jar_path).as_uri() for jar_path in test_jars]
-
- def test_collect_for_all_data_types(self):
- expected_result = [Row(1, None, 1, True, 32767, -2147483648, 1.23,
- 1.98932, bytearray(b'pyflink'), 'pyflink',
- datetime.date(2014, 9, 13), datetime.time(12, 0),
- datetime.datetime(2018, 3, 11, 3, 0, 0, 123000),
- [Row(['[pyflink]']), Row(['[pyflink]']),
- Row(['[pyflink]'])], {1: Row(['[flink]']), 2: Row(['[pyflink]'])},
- decimal.Decimal('1000000000000000000.05'),
- decimal.Decimal(
- '1000000000000000000.05999999999999999899999999999'))]
- source = self.t_env.from_elements([(1, None, 1, True, 32767, -2147483648, 1.23, 1.98932,
- bytearray(b'pyflink'), 'pyflink',
- datetime.date(2014, 9, 13),
- datetime.time(hour=12, minute=0, second=0,
- microsecond=123000),
- datetime.datetime(2018, 3, 11, 3, 0, 0, 123000),
- [Row(['pyflink']), Row(['pyflink']), Row(['pyflink'])],
- {1: Row(['flink']), 2: Row(['pyflink'])},
- decimal.Decimal('1000000000000000000.05'),
- decimal.Decimal(
- '1000000000000000000.0599999999999999989'
- '9999999999'))],
- DataTypes.ROW([DataTypes.FIELD("a", DataTypes.BIGINT()),
- DataTypes.FIELD("b", DataTypes.BIGINT()),
- DataTypes.FIELD("c", DataTypes.TINYINT()),
- DataTypes.FIELD("d", DataTypes.BOOLEAN()),
- DataTypes.FIELD("e", DataTypes.SMALLINT()),
- DataTypes.FIELD("f", DataTypes.INT()),
- DataTypes.FIELD("g", DataTypes.FLOAT()),
- DataTypes.FIELD("h", DataTypes.DOUBLE()),
- DataTypes.FIELD("i", DataTypes.BYTES()),
- DataTypes.FIELD("j", DataTypes.STRING()),
- DataTypes.FIELD("k", DataTypes.DATE()),
- DataTypes.FIELD("l", DataTypes.TIME()),
- DataTypes.FIELD("m",
- DataTypes.TIMESTAMP(3)),
- DataTypes.FIELD("n", DataTypes.ARRAY(
- DataTypes.ROW([DataTypes.FIELD('ss2',
- DataTypes.STRING())]))),
- DataTypes.FIELD("o", DataTypes.MAP(
- DataTypes.BIGINT(), DataTypes.ROW(
- [DataTypes.FIELD('ss',
- DataTypes.STRING())]))),
- DataTypes.FIELD("p",
- DataTypes.DECIMAL(38, 18)),
- DataTypes.FIELD("q",
- DataTypes.DECIMAL(38,
- 18))]))
- table_result = source.execute()
- with table_result.collect() as result:
- collected_result = []
- for i in result:
- collected_result.append(i)
- self.assertEqual(expected_result, collected_result)
-
- def test_collect_with_retract(self):
-
- expected_row_kinds = [RowKind.INSERT, RowKind.DELETE, RowKind.INSERT, RowKind.INSERT,
- RowKind.DELETE, RowKind.INSERT]
- element_data = [(1, 2, 'a'),
- (3, 4, 'b'),
- (5, 6, 'a'),
- (7, 8, 'b')]
- field_names = ['a', 'b', 'c']
- source = self.t_env.from_elements(element_data, field_names)
- table_result = self.t_env.execute_sql(
- "SELECT SUM(a), c FROM %s group by c" % source)
- with table_result.collect() as result:
- collected_result = []
- for i in result:
- collected_result.append(i)
-
- collected_result = [str(result) + ',' + str(result.get_row_kind())
- for result in collected_result]
- expected_result = [Row(1, 'a'), Row(1, 'a'), Row(6, 'a'), Row(3, 'b'),
- Row(3, 'b'), Row(10, 'b')]
- for i in range(len(expected_result)):
- expected_result[i] = str(expected_result[i]) + ',' + str(expected_row_kinds[i])
- expected_result.sort()
- collected_result.sort()
- self.assertEqual(expected_result, collected_result)
-
-
class DataStreamConversionTestCases(object):
def test_from_data_stream(self):
@@ -755,21 +258,6 @@ class DataStreamConversionTestCases(object):
self.assertEqual(result, expected)
-class LegacyBlinkBatchTableEnvironmentTests(TableEnvironmentTest,
- PyFlinkLegacyBlinkBatchTableTestCase):
- pass
-
-
-class LegacyBlinkStreamTableEnvironmentTests(TableEnvironmentTest, DataStreamConversionTestCases,
- PyFlinkLegacyBlinkStreamTableTestCase):
- pass
-
-
-class LegacyFlinkStreamTableEnvironmentTests(TableEnvironmentTest, DataStreamConversionTestCases,
- PyFlinkLegacyFlinkStreamTableTestCase):
- pass
-
-
class BlinkStreamTableEnvironmentTests(TableEnvironmentTest, PyFlinkBlinkStreamTableTestCase):
def test_collect_with_retract(self):
@@ -841,117 +329,6 @@ class BlinkStreamTableEnvironmentTests(TableEnvironmentTest, PyFlinkBlinkStreamT
self.assertEqual(expected_result, collected_result)
-class BatchTableEnvironmentTests(TableEnvironmentTest, PyFlinkOldBatchTableTestCase):
-
- def test_explain_with_multi_sinks(self):
- t_env = self.t_env
- source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
- field_names = ["a", "b", "c"]
- field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
- t_env.register_table_sink(
- "sink1",
- CsvTableSink(field_names, field_types, "path1"))
- t_env.register_table_sink(
- "sink2",
- CsvTableSink(field_names, field_types, "path2"))
-
- stmt_set = t_env.create_statement_set()
- stmt_set.add_insert_sql("insert into sink1 select * from %s where a > 100" % source)
- stmt_set.add_insert_sql("insert into sink2 select * from %s where a < 100" % source)
-
- actual = stmt_set.explain(ExplainDetail.ESTIMATED_COST, ExplainDetail.CHANGELOG_MODE)
-
- assert isinstance(actual, str)
-
- def test_statement_set(self):
- t_env = self.t_env
- source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
- field_names = ["a", "b", "c"]
- field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
- t_env.register_table_sink(
- "sink1",
- CsvTableSink(field_names, field_types, "path1"))
- t_env.register_table_sink(
- "sink2",
- CsvTableSink(field_names, field_types, "path2"))
-
- stmt_set = t_env.create_statement_set()
-
- stmt_set.add_insert_sql("insert into sink1 select * from %s where a > 100" % source) \
- .add_insert("sink2", source.filter("a < 100"))
-
- actual = stmt_set.explain()
- assert isinstance(actual, str)
-
- def test_create_table_environment(self):
- table_config = TableConfig()
- table_config.set_max_generated_code_length(32000)
- table_config.set_null_check(False)
- table_config.set_local_timezone("Asia/Shanghai")
-
- env = ExecutionEnvironment.get_execution_environment()
- t_env = BatchTableEnvironment.create(env, table_config)
-
- readed_table_config = t_env.get_config()
-
- self.assertFalse(readed_table_config.get_null_check())
- self.assertEqual(readed_table_config.get_max_generated_code_length(), 32000)
- self.assertEqual(readed_table_config.get_local_timezone(), "Asia/Shanghai")
-
- def test_create_table_environment_with_old_planner(self):
- t_env = BatchTableEnvironment.create(
- environment_settings=EnvironmentSettings.new_instance().in_batch_mode()
- .use_old_planner().build())
- self.assertEqual(
- t_env._j_tenv.getClass().getName(),
- "org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl")
-
- def test_create_table_environment_with_blink_planner(self):
- t_env = BatchTableEnvironment.create(
- environment_settings=EnvironmentSettings.new_instance().in_batch_mode()
- .use_blink_planner().build())
-
- planner = t_env._j_tenv.getPlanner()
-
- self.assertEqual(
- planner.getClass().getName(),
- "org.apache.flink.table.planner.delegation.BatchPlanner")
-
- def test_table_environment_with_blink_planner(self):
- t_env = BatchTableEnvironment.create(
- environment_settings=EnvironmentSettings.new_instance().in_batch_mode()
- .use_blink_planner().build())
-
- source_path = os.path.join(self.tempdir + '/streaming.csv')
- sink_path = os.path.join(self.tempdir + '/results')
- field_names = ["a", "b", "c"]
- field_types = [DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING()]
- data = [(1, 'hi', 'hello'), (2, 'hello', 'hello')]
- csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
-
- t_env.register_table_source("source", csv_source)
-
- t_env.register_table_sink(
- "sink",
- CsvTableSink(field_names, field_types, sink_path))
- source = t_env.from_path("source")
-
- result = source.alias("a, b, c").select("1 + a, b, c")
-
- result.execute_insert("sink").wait()
-
- results = []
- for root, dirs, files in os.walk(sink_path):
- for sub_file in files:
- with open(os.path.join(root, sub_file), 'r') as f:
- line = f.readline()
- while line is not None and line != '':
- results.append(line)
- line = f.readline()
-
- self.assert_equals(results, ['2,hi,hello\n', '3,hello,hello\n'])
-
-
class BlinkBatchTableEnvironmentTests(PyFlinkBlinkBatchTableTestCase):
def test_explain_with_multi_sinks(self):
diff --git a/flink-python/pyflink/table/tests/test_udf.py b/flink-python/pyflink/table/tests/test_udf.py
index 19cdc5f..c357554 100644
--- a/flink-python/pyflink/table/tests/test_udf.py
+++ b/flink-python/pyflink/table/tests/test_udf.py
@@ -24,9 +24,8 @@ import pytz
from pyflink.table import DataTypes, expressions as expr
from pyflink.table.udf import ScalarFunction, udf
from pyflink.testing import source_sink_utils
-from pyflink.testing.test_case_utils import PyFlinkOldStreamTableTestCase, \
- PyFlinkBlinkStreamTableTestCase, PyFlinkBlinkBatchTableTestCase, \
- PyFlinkOldBatchTableTestCase
+from pyflink.testing.test_case_utils import PyFlinkBlinkStreamTableTestCase, \
+ PyFlinkBlinkBatchTableTestCase
class UserDefinedFunctionTests(object):
@@ -639,24 +638,6 @@ def float_equal(a, b, rel_tol=1e-09, abs_tol=0.0):
return abs(a - b) <= max(rel_tol * max(abs(a), abs(b)), abs_tol)
-class PyFlinkStreamUserDefinedFunctionTests(UserDefinedFunctionTests,
- PyFlinkOldStreamTableTestCase):
- pass
-
-
-class PyFlinkBatchUserDefinedFunctionTests(PyFlinkOldBatchTableTestCase):
-
- def test_chaining_scalar_function(self):
- add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT())
- subtract_one = udf(SubtractOne(), result_type=DataTypes.BIGINT())
-
- t = self.t_env.from_elements([(1, 2, 1), (2, 5, 2), (3, 1, 3)], ['a', 'b', 'c'])
- t = t.select(add(add_one(t.a), subtract_one(t.b)), t.c, expr.lit(1))
-
- result = self.collect(t)
- self.assertEqual(result, ["+I[3, 1, 1]", "+I[7, 2, 1]", "+I[4, 3, 1]"])
-
-
class PyFlinkBlinkStreamUserDefinedFunctionTests(UserDefinedFunctionTests,
PyFlinkBlinkStreamTableTestCase):
def test_deterministic(self):
diff --git a/flink-python/pyflink/table/tests/test_udtf.py b/flink-python/pyflink/table/tests/test_udtf.py
index 976b2f6..79d66ad 100644
--- a/flink-python/pyflink/table/tests/test_udtf.py
+++ b/flink-python/pyflink/table/tests/test_udtf.py
@@ -20,8 +20,8 @@ import unittest
from pyflink.table import DataTypes
from pyflink.table.udf import TableFunction, udtf, ScalarFunction, udf
from pyflink.testing import source_sink_utils
-from pyflink.testing.test_case_utils import PyFlinkOldStreamTableTestCase, \
- PyFlinkBlinkStreamTableTestCase, PyFlinkOldBatchTableTestCase, PyFlinkBlinkBatchTableTestCase
+from pyflink.testing.test_case_utils import PyFlinkBlinkStreamTableTestCase, \
+ PyFlinkBlinkBatchTableTestCase
class UserDefinedTableFunctionTests(object):
@@ -71,11 +71,6 @@ class UserDefinedTableFunctionTests(object):
return source_sink_utils.results()
-class PyFlinkStreamUserDefinedTableFunctionTests(UserDefinedTableFunctionTests,
- PyFlinkOldStreamTableTestCase):
- pass
-
-
class PyFlinkBlinkStreamUserDefinedFunctionTests(UserDefinedTableFunctionTests,
PyFlinkBlinkStreamTableTestCase):
def test_execute_from_json_plan(self):
@@ -134,26 +129,6 @@ class PyFlinkBlinkBatchUserDefinedFunctionTests(UserDefinedTableFunctionTests,
pass
-class PyFlinkBatchUserDefinedTableFunctionTests(UserDefinedTableFunctionTests,
- PyFlinkOldBatchTableTestCase):
- def _register_table_sink(self, field_names: list, field_types: list):
- pass
-
- def _get_output(self, t):
- return self.collect(t)
-
- def test_row_type_as_input_types_and_result_types(self):
- # test input_types and result_types are DataTypes.ROW
- a = udtf(lambda i: i,
- input_types=DataTypes.ROW([DataTypes.FIELD("a", DataTypes.BIGINT())]),
- result_types=DataTypes.ROW([DataTypes.FIELD("a", DataTypes.BIGINT())]))
-
- self.assertEqual(a._input_types,
- [DataTypes.ROW([DataTypes.FIELD("a", DataTypes.BIGINT())])])
- self.assertEqual(a._result_types,
- [DataTypes.ROW([DataTypes.FIELD("a", DataTypes.BIGINT())])])
-
-
class MultiEmit(TableFunction, unittest.TestCase):
def open(self, function_context):
diff --git a/flink-python/pyflink/testing/test_case_utils.py b/flink-python/pyflink/testing/test_case_utils.py
index e086ca4..dd1d56b 100644
--- a/flink-python/pyflink/testing/test_case_utils.py
+++ b/flink-python/pyflink/testing/test_case_utils.py
@@ -30,13 +30,10 @@ from py4j.protocol import Py4JJavaError
from pyflink.common import JobExecutionResult
from pyflink.datastream.execution_mode import RuntimeExecutionMode
-from pyflink.table import TableConfig
from pyflink.table.sources import CsvTableSource
-from pyflink.dataset.execution_environment import ExecutionEnvironment
from pyflink.datastream.stream_execution_environment import StreamExecutionEnvironment
from pyflink.find_flink_home import _find_flink_home, _find_flink_source_root
-from pyflink.table.table_environment import BatchTableEnvironment, StreamTableEnvironment, \
- TableEnvironment
+from pyflink.table.table_environment import TableEnvironment
from pyflink.table.environment_settings import EnvironmentSettings
from pyflink.java_gateway import get_gateway
from pyflink.util.java_utils import add_jars_to_context_class_loader, to_jarray
@@ -146,91 +143,6 @@ class PyFlinkTestCase(unittest.TestCase):
return CsvTableSource(path, fields, data_types)
-class PyFlinkLegacyBlinkBatchTableTestCase(PyFlinkTestCase):
- """
- Base class for pure Blink Batch TableEnvironment tests.
- """
-
- def setUp(self):
- super(PyFlinkLegacyBlinkBatchTableTestCase, self).setUp()
- self.t_env = BatchTableEnvironment.create(
- environment_settings=EnvironmentSettings.new_instance()
- .in_batch_mode().use_blink_planner().build())
- self.t_env._j_tenv.getPlanner().getExecEnv().setParallelism(2)
- self.t_env.get_config().get_configuration().set_string(
- "python.fn-execution.bundle.size", "1")
-
-
-class PyFlinkLegacyBlinkStreamTableTestCase(PyFlinkTestCase):
- """
- Base class for pure Blink Batch TableEnvironment tests.
- """
-
- def setUp(self):
- super(PyFlinkLegacyBlinkStreamTableTestCase, self).setUp()
- self.env = StreamExecutionEnvironment.get_execution_environment()
- self.env.set_parallelism(2)
- self.t_env = StreamTableEnvironment.create(
- self.env,
- environment_settings=EnvironmentSettings.new_instance()
- .in_streaming_mode().use_blink_planner().build())
- self.t_env.get_config().get_configuration().set_string(
- "python.fn-execution.bundle.size", "1")
-
-
-class PyFlinkLegacyFlinkStreamTableTestCase(PyFlinkTestCase):
- """
- Base class for pure Flink Stream TableEnvironment tests.
- """
-
- def setUp(self):
- super(PyFlinkLegacyFlinkStreamTableTestCase, self).setUp()
- self.env = StreamExecutionEnvironment.get_execution_environment()
- self.env.set_parallelism(2)
- self.t_env = StreamTableEnvironment.create(
- self.env,
- environment_settings=EnvironmentSettings.new_instance()
- .in_streaming_mode().use_old_planner().build())
- self.t_env.get_config().get_configuration().set_string(
- "python.fn-execution.bundle.size", "1")
-
-
-class PyFlinkOldStreamTableTestCase(PyFlinkTestCase):
- """
- Base class for old planner stream tests.
- """
-
- def setUp(self):
- super(PyFlinkOldStreamTableTestCase, self).setUp()
- self.t_env = TableEnvironment.create(
- EnvironmentSettings.new_instance().in_streaming_mode().use_old_planner().build())
- self.t_env.get_config().get_configuration().set_string("parallelism.default", "2")
- self.t_env.get_config().get_configuration().set_string(
- "python.fn-execution.bundle.size", "1")
-
-
-class PyFlinkOldBatchTableTestCase(PyFlinkTestCase):
- """
- Base class for batch tests.
- """
-
- def setUp(self):
- super(PyFlinkOldBatchTableTestCase, self).setUp()
- self.env = ExecutionEnvironment.get_execution_environment()
- self.env.set_parallelism(2)
- self.t_env = BatchTableEnvironment.create(self.env, TableConfig())
- self.t_env.get_config().get_configuration().set_string(
- "python.fn-execution.bundle.size", "1")
-
- def collect(self, table):
- j_table = table._j_table
- gateway = get_gateway()
- row_result = self.t_env._j_tenv\
- .toDataSet(j_table, gateway.jvm.Class.forName("org.apache.flink.types.Row")).collect()
- string_result = [java_row.toString() for java_row in row_result]
- return string_result
-
-
class PyFlinkBlinkStreamTableTestCase(PyFlinkTestCase):
"""
Base class for stream tests of blink planner.
diff --git a/flink-python/setup.py b/flink-python/setup.py
index efa6ffa..94b2019 100644
--- a/flink-python/setup.py
+++ b/flink-python/setup.py
@@ -269,7 +269,6 @@ try:
'pyflink.table',
'pyflink.util',
'pyflink.datastream',
- 'pyflink.dataset',
'pyflink.common',
'pyflink.fn_execution',
'pyflink.fn_execution.beam',
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java
index f381bf3..1264247 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java
@@ -19,22 +19,19 @@
package org.apache.flink.table.runtime.arrow;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
-import org.apache.flink.table.api.internal.BatchTableEnvImpl;
-import org.apache.flink.table.api.internal.TableEnvImpl;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.api.internal.TableImpl;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.data.vector.ColumnVector;
-import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.operations.OutputConversionModifyOperation;
-import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.runtime.arrow.readers.ArrayFieldReader;
import org.apache.flink.table.runtime.arrow.readers.ArrowFieldReader;
import org.apache.flink.table.runtime.arrow.readers.BigIntFieldReader;
@@ -664,7 +661,6 @@ public final class ArrowUtils {
ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter(root, null, baos);
arrowStreamWriter.start();
- ArrowWriter arrowWriter;
Iterator<Row> results = table.execute().collect();
Iterator<Row> appendOnlyResults;
if (isAppendOnlyTable(table)) {
@@ -673,28 +669,21 @@ public final class ArrowUtils {
appendOnlyResults = filterOutRetractRows(results);
}
- Iterator convertedResults;
- if (isBlinkPlanner(table)) {
- arrowWriter = createRowDataArrowWriter(root, rowType);
- convertedResults =
- new Iterator<RowData>() {
- @Override
- public boolean hasNext() {
- return appendOnlyResults.hasNext();
- }
-
- @Override
- public RowData next() {
- DataFormatConverters.DataFormatConverter converter =
- DataFormatConverters.getConverterForDataType(
- defaultRowDataType);
- return (RowData) converter.toInternal(appendOnlyResults.next());
- }
- };
- } else {
- arrowWriter = createRowArrowWriter(root, rowType);
- convertedResults = appendOnlyResults;
- }
+ ArrowWriter arrowWriter = createRowDataArrowWriter(root, rowType);
+ Iterator convertedResults =
+ new Iterator<RowData>() {
+ @Override
+ public boolean hasNext() {
+ return appendOnlyResults.hasNext();
+ }
+
+ @Override
+ public RowData next() {
+ DataFormatConverters.DataFormatConverter converter =
+ DataFormatConverters.getConverterForDataType(defaultRowDataType);
+ return (RowData) converter.toInternal(appendOnlyResults.next());
+ }
+ };
return new CustomIterator<byte[]>() {
@Override
@@ -750,39 +739,22 @@ public final class ArrowUtils {
return result.iterator();
}
- private static boolean isBlinkPlanner(Table table) {
+ private static boolean isStreamingMode(Table table) {
TableEnvironment tableEnv = ((TableImpl) table).getTableEnvironment();
- if (tableEnv instanceof TableEnvImpl) {
- return false;
- } else if (tableEnv instanceof TableEnvironmentImpl) {
- Planner planner = ((TableEnvironmentImpl) tableEnv).getPlanner();
- return planner instanceof PlannerBase;
+ if (tableEnv instanceof TableEnvironmentImpl) {
+ final RuntimeExecutionMode mode =
+ tableEnv.getConfig().getConfiguration().get(ExecutionOptions.RUNTIME_MODE);
+ if (mode == RuntimeExecutionMode.AUTOMATIC) {
+ throw new RuntimeException(
+ String.format("Runtime execution mode '%s' is not supported yet.", mode));
+ }
+ return mode == RuntimeExecutionMode.STREAMING;
} else {
- throw new RuntimeException(
- String.format(
- "Could not determine the planner type for table environment class %s.",
- tableEnv.getClass()));
- }
- }
-
- private static boolean isStreamingMode(Table table) throws Exception {
- TableEnvironment tableEnv = ((TableImpl) table).getTableEnvironment();
- if (tableEnv instanceof BatchTableEnvironment || tableEnv instanceof BatchTableEnvImpl) {
return false;
- } else if (tableEnv instanceof TableEnvironmentImpl) {
- java.lang.reflect.Field isStreamingModeMethod =
- TableEnvironmentImpl.class.getDeclaredField("isStreamingMode");
- isStreamingModeMethod.setAccessible(true);
- return (boolean) isStreamingModeMethod.get(tableEnv);
- } else {
- throw new RuntimeException(
- String.format(
- "Could not determine the streaming mode for table environment class %s",
- tableEnv.getClass()));
}
}
- private static boolean isAppendOnlyTable(Table table) throws Exception {
+ private static boolean isAppendOnlyTable(Table table) {
if (isStreamingMode(table)) {
TableEnvironmentImpl tableEnv =
(TableEnvironmentImpl) ((TableImpl) table).getTableEnvironment();
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/AbstractPythonScalarFunctionFlatMap.java b/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/AbstractPythonScalarFunctionFlatMap.java
deleted file mode 100644
index a83034d..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/AbstractPythonScalarFunctionFlatMap.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.functions.python;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.fnexecution.v1.FlinkFnApi;
-import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
-import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.table.functions.python.PythonEnv;
-import org.apache.flink.table.functions.python.PythonFunctionInfo;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.utils.TypeConversions;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Arrays;
-
-/**
- * The abstract base {@link RichFlatMapFunction} used to invoke Python {@link ScalarFunction}
- * functions for the old planner.
- */
-@Internal
-public abstract class AbstractPythonScalarFunctionFlatMap
- extends AbstractPythonStatelessFunctionFlatMap {
-
- private static final long serialVersionUID = 1L;
-
- private static final String SCALAR_FUNCTION_URN = "flink:transform:scalar_function:v1";
-
- /** The Python {@link ScalarFunction}s to be executed. */
- public final PythonFunctionInfo[] scalarFunctions;
-
- /** The offset of the fields which should be forwarded. */
- private final int[] forwardedFields;
-
- public AbstractPythonScalarFunctionFlatMap(
- Configuration config,
- PythonFunctionInfo[] scalarFunctions,
- RowType inputType,
- RowType outputType,
- int[] udfInputOffsets,
- int[] forwardedFields) {
- super(config, inputType, outputType, udfInputOffsets);
- this.scalarFunctions = Preconditions.checkNotNull(scalarFunctions);
- this.forwardedFields = Preconditions.checkNotNull(forwardedFields);
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
-
- RowTypeInfo forwardedInputTypeInfo =
- new RowTypeInfo(
- Arrays.stream(forwardedFields)
- .mapToObj(i -> inputType.getFields().get(i))
- .map(RowType.RowField::getType)
- .map(TypeConversions::fromLogicalToDataType)
- .map(TypeConversions::fromDataTypeToLegacyInfo)
- .toArray(TypeInformation[]::new));
- forwardedInputSerializer =
- forwardedInputTypeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
- }
-
- @Override
- public PythonEnv getPythonEnv() {
- return scalarFunctions[0].getPythonFunction().getPythonEnv();
- }
-
- @Override
- public void bufferInput(Row input) {
- Row forwardedFieldsRow = Row.project(input, forwardedFields);
- if (getRuntimeContext().getExecutionConfig().isObjectReuseEnabled()) {
- forwardedFieldsRow = forwardedInputSerializer.copy(forwardedFieldsRow);
- }
- forwardedInputQueue.add(forwardedFieldsRow);
- }
-
- @Override
- public int getForwardedFieldsCount() {
- return forwardedFields.length;
- }
-
- @Override
- public FlinkFnApi.UserDefinedFunctions getUserDefinedFunctionsProto() {
- FlinkFnApi.UserDefinedFunctions.Builder builder =
- FlinkFnApi.UserDefinedFunctions.newBuilder();
- // add udf proto
- for (PythonFunctionInfo pythonFunctionInfo : scalarFunctions) {
- builder.addUdfs(PythonOperatorUtils.getUserDefinedFunctionProto(pythonFunctionInfo));
- }
- builder.setMetricEnabled(getPythonConfig().isMetricEnabled());
- return builder.build();
- }
-
- @Override
- public String getFunctionUrn() {
- return SCALAR_FUNCTION_URN;
- }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/AbstractPythonStatelessFunctionFlatMap.java b/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/AbstractPythonStatelessFunctionFlatMap.java
deleted file mode 100644
index 62ca509..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/AbstractPythonStatelessFunctionFlatMap.java
+++ /dev/null
@@ -1,312 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.functions.python;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ConfigurationUtils;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.fnexecution.v1.FlinkFnApi;
-import org.apache.flink.python.PythonConfig;
-import org.apache.flink.python.PythonFunctionRunner;
-import org.apache.flink.python.PythonOptions;
-import org.apache.flink.python.env.PythonDependencyInfo;
-import org.apache.flink.python.env.PythonEnvironmentManager;
-import org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager;
-import org.apache.flink.python.metric.FlinkMetricContainer;
-import org.apache.flink.table.functions.python.PythonEnv;
-import org.apache.flink.table.runtime.runners.python.beam.BeamTableStatelessPythonFunctionRunner;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter;
-import org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.stream.Collectors;
-
-/**
- * Base Python stateless {@link RichFlatMapFunction} used to invoke Python stateless functions for
- * the old planner.
- */
-@Internal
-public abstract class AbstractPythonStatelessFunctionFlatMap extends RichFlatMapFunction<Row, Row>
- implements ResultTypeQueryable<Row> {
-
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG =
- LoggerFactory.getLogger(AbstractPythonStatelessFunctionFlatMap.class);
-
- /** The python config. */
- private final PythonConfig config;
-
- /** The offsets of user-defined function inputs. */
- private final int[] userDefinedFunctionInputOffsets;
-
- /** The input logical type. */
- protected final RowType inputType;
-
- /** The output logical type. */
- protected final RowType outputType;
-
- /** The options used to configure the Python worker process. */
- protected final Map<String, String> jobOptions;
-
- /** The user-defined function input logical type. */
- protected transient RowType userDefinedFunctionInputType;
-
- /** The user-defined function output logical type. */
- protected transient RowType userDefinedFunctionOutputType;
-
- /**
- * The queue holding the input elements for which the execution results have not been received.
- */
- protected transient LinkedBlockingQueue<Row> forwardedInputQueue;
-
- /** Max number of elements to include in a bundle. */
- private transient int maxBundleSize;
-
- /** Number of processed elements in the current bundle. */
- private transient int elementCount;
-
- /** OutputStream Wrapper. */
- transient DataOutputViewStreamWrapper baosWrapper;
-
- /** The collector used to collect records. */
- protected transient Collector<Row> resultCollector;
-
- /**
- * The {@link PythonFunctionRunner} which is responsible for Python user-defined function
- * execution.
- */
- protected transient PythonFunctionRunner pythonFunctionRunner;
-
- /** Reusable InputStream used to holding the execution results to be deserialized. */
- protected transient ByteArrayInputStreamWithPos bais;
-
- /** InputStream Wrapper. */
- protected transient DataInputViewStreamWrapper baisWrapper;
-
- /** The type serializer for the forwarded fields. */
- protected transient TypeSerializer<Row> forwardedInputSerializer;
-
- /** Reusable OutputStream used to holding the serialized input elements. */
- protected transient ByteArrayOutputStreamWithPos baos;
-
- public AbstractPythonStatelessFunctionFlatMap(
- Configuration config,
- RowType inputType,
- RowType outputType,
- int[] userDefinedFunctionInputOffsets) {
- this.inputType = Preconditions.checkNotNull(inputType);
- this.outputType = Preconditions.checkNotNull(outputType);
- this.userDefinedFunctionInputOffsets =
- Preconditions.checkNotNull(userDefinedFunctionInputOffsets);
- this.config = new PythonConfig(Preconditions.checkNotNull(config));
- this.jobOptions = buildJobOptions(config);
- }
-
- protected PythonConfig getPythonConfig() {
- return config;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
-
- this.elementCount = 0;
- this.maxBundleSize = config.getMaxBundleSize();
- if (this.maxBundleSize <= 0) {
- this.maxBundleSize = PythonOptions.MAX_BUNDLE_SIZE.defaultValue();
- LOG.error(
- "Invalid value for the maximum bundle size. Using default value of "
- + this.maxBundleSize
- + '.');
- } else {
- LOG.info("The maximum bundle size is configured to {}.", this.maxBundleSize);
- }
-
- if (config.getMaxBundleTimeMills() != PythonOptions.MAX_BUNDLE_TIME_MILLS.defaultValue()) {
- LOG.info(
- "Maximum bundle time takes no effect in old planner under batch mode. "
- + "Config maximum bundle size instead! "
- + "Under batch mode, bundle size should be enough to control both throughput and latency.");
- }
- forwardedInputQueue = new LinkedBlockingQueue<>();
- userDefinedFunctionInputType =
- new RowType(
- Arrays.stream(userDefinedFunctionInputOffsets)
- .mapToObj(i -> inputType.getFields().get(i))
- .collect(Collectors.toList()));
-
- bais = new ByteArrayInputStreamWithPos();
- baisWrapper = new DataInputViewStreamWrapper(bais);
-
- baos = new ByteArrayOutputStreamWithPos();
- baosWrapper = new DataOutputViewStreamWrapper(baos);
-
- userDefinedFunctionOutputType =
- new RowType(
- outputType
- .getFields()
- .subList(getForwardedFieldsCount(), outputType.getFieldCount()));
-
- this.pythonFunctionRunner = createPythonFunctionRunner();
- this.pythonFunctionRunner.open(config);
- }
-
- @Override
- public void flatMap(Row value, Collector<Row> out) throws Exception {
- this.resultCollector = out;
- bufferInput(value);
- processElementInternal(value);
- checkInvokeFinishBundleByCount();
- emitResults();
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public TypeInformation<Row> getProducedType() {
- return (TypeInformation<Row>)
- LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(
- LogicalTypeDataTypeConverter.toDataType(outputType));
- }
-
- @Override
- public void close() throws Exception {
- try {
- invokeFinishBundle();
-
- if (pythonFunctionRunner != null) {
- pythonFunctionRunner.close();
- pythonFunctionRunner = null;
- }
- } finally {
- super.close();
- }
- }
-
- /** Returns the {@link PythonEnv} used to create PythonEnvironmentManager.. */
- public abstract PythonEnv getPythonEnv();
-
- public abstract void bufferInput(Row input);
-
- public abstract void emitResult(Tuple2<byte[], Integer> resultTuple) throws Exception;
-
- public abstract int getForwardedFieldsCount();
-
- /** Gets the proto representation of the Python user-defined functions to be executed. */
- public abstract FlinkFnApi.UserDefinedFunctions getUserDefinedFunctionsProto();
-
- public abstract String getInputOutputCoderUrn();
-
- public abstract String getFunctionUrn();
-
- public abstract void processElementInternal(Row value) throws Exception;
-
- /** Checks whether to invoke finishBundle by elements count. Called in flatMap. */
- protected void checkInvokeFinishBundleByCount() throws Exception {
- elementCount++;
- if (elementCount >= maxBundleSize) {
- invokeFinishBundle();
- }
- }
-
- protected PythonEnvironmentManager createPythonEnvironmentManager() throws IOException {
- PythonDependencyInfo dependencyInfo =
- PythonDependencyInfo.create(config, getRuntimeContext().getDistributedCache());
- PythonEnv pythonEnv = getPythonEnv();
- if (pythonEnv.getExecType() == PythonEnv.ExecType.PROCESS) {
- return new ProcessPythonEnvironmentManager(
- dependencyInfo,
- ConfigurationUtils.splitPaths(System.getProperty("java.io.tmpdir")),
- System.getenv());
- } else {
- throw new UnsupportedOperationException(
- String.format(
- "Execution type '%s' is not supported.", pythonEnv.getExecType()));
- }
- }
-
- protected FlinkMetricContainer getFlinkMetricContainer() {
- return this.config.isMetricEnabled()
- ? new FlinkMetricContainer(getRuntimeContext().getMetricGroup())
- : null;
- }
-
- protected Row getFunctionInput(Row element) {
- return Row.project(element, userDefinedFunctionInputOffsets);
- }
-
- private void emitResults() throws Exception {
- Tuple2<byte[], Integer> resultTuple;
- while ((resultTuple = pythonFunctionRunner.pollResult()) != null) {
- emitResult(resultTuple);
- }
- }
-
- protected void invokeFinishBundle() throws Exception {
- if (elementCount > 0) {
- pythonFunctionRunner.flush();
- elementCount = 0;
- emitResults();
- }
- }
-
- private PythonFunctionRunner createPythonFunctionRunner() throws IOException {
- return new BeamTableStatelessPythonFunctionRunner(
- getRuntimeContext().getTaskName(),
- createPythonEnvironmentManager(),
- userDefinedFunctionInputType,
- userDefinedFunctionOutputType,
- getFunctionUrn(),
- getUserDefinedFunctionsProto(),
- getInputOutputCoderUrn(),
- jobOptions,
- getFlinkMetricContainer(),
- null,
- 0.0,
- FlinkFnApi.CoderParam.OutputMode.SINGLE);
- }
-
- private Map<String, String> buildJobOptions(Configuration config) {
- Map<String, String> jobOptions = new HashMap<>();
- if (config.containsKey("table.exec.timezone")) {
- jobOptions.put("table.exec.timezone", config.getString("table.exec.timezone", null));
- }
- return jobOptions;
- }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/PythonScalarFunctionFlatMap.java b/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/PythonScalarFunctionFlatMap.java
deleted file mode 100644
index e1fc082..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/PythonScalarFunctionFlatMap.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.functions.python;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.table.functions.python.PythonFunctionInfo;
-import org.apache.flink.table.runtime.typeutils.PythonTypeUtils;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.Row;
-
-import java.io.IOException;
-
-/**
- * The {@link RichFlatMapFunction} used to invoke Python {@link ScalarFunction} functions for the
- * old planner.
- */
-@Internal
-public class PythonScalarFunctionFlatMap extends AbstractPythonScalarFunctionFlatMap {
-
- private static final long serialVersionUID = 1L;
-
- private static final String SCALAR_FUNCTION_SCHEMA_CODER_URN =
- "flink:coder:schema:scalar_function:v1";
-
- /** The TypeSerializer for udf input elements. */
- private transient TypeSerializer<Row> userDefinedFunctionInputTypeSerializer;
-
- /** The TypeSerializer for user-defined function execution results. */
- private transient TypeSerializer<Row> userDefinedFunctionOutputTypeSerializer;
-
- public PythonScalarFunctionFlatMap(
- Configuration config,
- PythonFunctionInfo[] scalarFunctions,
- RowType inputType,
- RowType outputType,
- int[] udfInputOffsets,
- int[] forwardedFields) {
- super(config, scalarFunctions, inputType, outputType, udfInputOffsets, forwardedFields);
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- userDefinedFunctionInputTypeSerializer =
- PythonTypeUtils.toFlinkTypeSerializer(userDefinedFunctionInputType);
- userDefinedFunctionOutputTypeSerializer =
- PythonTypeUtils.toFlinkTypeSerializer(userDefinedFunctionOutputType);
- }
-
- @Override
- public String getInputOutputCoderUrn() {
- return SCALAR_FUNCTION_SCHEMA_CODER_URN;
- }
-
- @Override
- public void processElementInternal(Row value) throws Exception {
- userDefinedFunctionInputTypeSerializer.serialize(getFunctionInput(value), baosWrapper);
- pythonFunctionRunner.process(baos.toByteArray());
- baos.reset();
- }
-
- @Override
- @SuppressWarnings("ConstantConditions")
- public void emitResult(Tuple2<byte[], Integer> resultTuple) throws IOException {
- byte[] rawUdfResult = resultTuple.f0;
- int length = resultTuple.f1;
- Row input = forwardedInputQueue.poll();
- bais.setBuffer(rawUdfResult, 0, length);
- Row udfResult = userDefinedFunctionOutputTypeSerializer.deserialize(baisWrapper);
- this.resultCollector.collect(Row.join(input, udfResult));
- }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/PythonTableFunctionFlatMap.java b/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/PythonTableFunctionFlatMap.java
deleted file mode 100644
index fd68938..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/PythonTableFunctionFlatMap.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.functions.python;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.fnexecution.v1.FlinkFnApi;
-import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
-import org.apache.flink.table.functions.TableFunction;
-import org.apache.flink.table.functions.python.PythonEnv;
-import org.apache.flink.table.functions.python.PythonFunctionInfo;
-import org.apache.flink.table.runtime.typeutils.PythonTypeUtils;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.utils.TypeConversions;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.calcite.rel.core.JoinRelType;
-
-/**
- * The {@link RichFlatMapFunction} used to invoke Python {@link TableFunction} functions for the old
- * planner.
- */
-@Internal
-public final class PythonTableFunctionFlatMap extends AbstractPythonStatelessFunctionFlatMap {
-
- private static final long serialVersionUID = 1L;
-
- private static final String TABLE_FUNCTION_SCHEMA_CODER_URN =
- "flink:coder:schema:table_function:v1";
-
- private static final String TABLE_FUNCTION_URN = "flink:transform:table_function:v1";
-
- /** The Python {@link TableFunction} to be executed. */
- private final PythonFunctionInfo tableFunction;
-
- /** The correlate join type. */
- private final JoinRelType joinType;
-
- /** The TypeSerializer for udf input elements. */
- private transient TypeSerializer<Row> userDefinedFunctionInputTypeSerializer;
-
- /** The TypeSerializer for user-defined function execution results. */
- private transient TypeSerializer<Row> userDefinedFunctionOutputTypeSerializer;
-
- public PythonTableFunctionFlatMap(
- Configuration config,
- PythonFunctionInfo tableFunction,
- RowType inputType,
- RowType outputType,
- int[] udtfInputOffsets,
- JoinRelType joinType) {
- super(config, inputType, outputType, udtfInputOffsets);
- this.tableFunction = Preconditions.checkNotNull(tableFunction);
- Preconditions.checkArgument(
- joinType == JoinRelType.INNER || joinType == JoinRelType.LEFT,
- "The join type should be inner join or left join");
- this.joinType = joinType;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
-
- RowTypeInfo forwardedInputTypeInfo =
- (RowTypeInfo)
- TypeConversions.fromDataTypeToLegacyInfo(
- TypeConversions.fromLogicalToDataType(inputType));
- forwardedInputSerializer =
- forwardedInputTypeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
- userDefinedFunctionInputTypeSerializer =
- PythonTypeUtils.toFlinkTypeSerializer(userDefinedFunctionInputType);
- userDefinedFunctionOutputTypeSerializer =
- PythonTypeUtils.toFlinkTypeSerializer(userDefinedFunctionOutputType);
- }
-
- @Override
- public PythonEnv getPythonEnv() {
- return tableFunction.getPythonFunction().getPythonEnv();
- }
-
- @Override
- public void bufferInput(Row input) {
- // If the input node is a DataSetCalc node, the RichFlatMapFunction generated by codegen
- // will reuse the output Row, so here we always copy the input Row to solve this problem.
- input = forwardedInputSerializer.copy(input);
- forwardedInputQueue.add(input);
- }
-
- @Override
- @SuppressWarnings("ConstantConditions")
- public void emitResult(Tuple2<byte[], Integer> resultTuple) throws Exception {
- Row input = forwardedInputQueue.poll();
- byte[] rawUdtfResult;
- int length;
- boolean isFinishResult;
- boolean hasJoined = false;
- Row udtfResult;
- do {
- rawUdtfResult = resultTuple.f0;
- length = resultTuple.f1;
- isFinishResult = isFinishResult(rawUdtfResult, length);
- if (!isFinishResult) {
- bais.setBuffer(rawUdtfResult, 0, length);
- udtfResult = userDefinedFunctionOutputTypeSerializer.deserialize(baisWrapper);
- this.resultCollector.collect(Row.join(input, udtfResult));
- resultTuple = pythonFunctionRunner.pollResult();
- hasJoined = true;
- } else if (joinType == JoinRelType.LEFT && !hasJoined) {
- udtfResult = new Row(userDefinedFunctionOutputType.getFieldCount());
- for (int i = 0; i < udtfResult.getArity(); i++) {
- udtfResult.setField(0, null);
- }
- this.resultCollector.collect(Row.join(input, udtfResult));
- }
- } while (!isFinishResult);
- }
-
- @Override
- public int getForwardedFieldsCount() {
- return inputType.getFieldCount();
- }
-
- @Override
- public FlinkFnApi.UserDefinedFunctions getUserDefinedFunctionsProto() {
- FlinkFnApi.UserDefinedFunctions.Builder builder =
- FlinkFnApi.UserDefinedFunctions.newBuilder();
- builder.addUdfs(PythonOperatorUtils.getUserDefinedFunctionProto(tableFunction));
- builder.setMetricEnabled(getPythonConfig().isMetricEnabled());
- return builder.build();
- }
-
- @Override
- public String getInputOutputCoderUrn() {
- return TABLE_FUNCTION_SCHEMA_CODER_URN;
- }
-
- @Override
- public String getFunctionUrn() {
- return TABLE_FUNCTION_URN;
- }
-
- @Override
- public void processElementInternal(Row value) throws Exception {
- userDefinedFunctionInputTypeSerializer.serialize(getFunctionInput(value), baosWrapper);
- pythonFunctionRunner.process(baos.toByteArray());
- baos.reset();
- }
-
- private boolean isFinishResult(byte[] rawUdtfResult, int length) {
- return length == 1 && rawUdtfResult[0] == 0x00;
- }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/arrow/ArrowPythonScalarFunctionFlatMap.java b/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/arrow/ArrowPythonScalarFunctionFlatMap.java
deleted file mode 100644
index d7253c5..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/arrow/ArrowPythonScalarFunctionFlatMap.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.functions.python.arrow;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.table.functions.python.PythonFunctionInfo;
-import org.apache.flink.table.runtime.arrow.serializers.ArrowSerializer;
-import org.apache.flink.table.runtime.arrow.serializers.RowArrowSerializer;
-import org.apache.flink.table.runtime.functions.python.AbstractPythonScalarFunctionFlatMap;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.Row;
-
-import java.io.IOException;
-
-/**
- * The {@link RichFlatMapFunction} used to invoke Arrow Python {@link ScalarFunction} functions for
- * the old planner.
- */
-@Internal
-public final class ArrowPythonScalarFunctionFlatMap extends AbstractPythonScalarFunctionFlatMap {
-
- private static final long serialVersionUID = 1L;
-
- private static final String SCHEMA_ARROW_CODER_URN = "flink:coder:schema:arrow:v1";
-
- /** The current number of elements to be included in an arrow batch. */
- private transient int currentBatchCount;
-
- /** Max number of elements to include in an arrow batch. */
- private final int maxArrowBatchSize;
-
- private transient ArrowSerializer<Row> arrowSerializer;
-
- public ArrowPythonScalarFunctionFlatMap(
- Configuration config,
- PythonFunctionInfo[] scalarFunctions,
- RowType inputType,
- RowType outputType,
- int[] udfInputOffsets,
- int[] forwardedFields) {
- super(config, scalarFunctions, inputType, outputType, udfInputOffsets, forwardedFields);
- maxArrowBatchSize = getPythonConfig().getMaxArrowBatchSize();
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
-
- arrowSerializer =
- new RowArrowSerializer(userDefinedFunctionInputType, userDefinedFunctionOutputType);
- arrowSerializer.open(bais, baos);
- currentBatchCount = 0;
- }
-
- @Override
- public void close() throws Exception {
- invokeCurrentBatch();
- try {
- super.close();
- } finally {
- if (arrowSerializer != null) {
- arrowSerializer.close();
- arrowSerializer = null;
- }
- }
- }
-
- @Override
- @SuppressWarnings("ConstantConditions")
- public void emitResult(Tuple2<byte[], Integer> resultTuple) throws IOException {
- byte[] udfResult = resultTuple.f0;
- int length = resultTuple.f1;
- bais.setBuffer(udfResult, 0, length);
- int rowCount = arrowSerializer.load();
- for (int i = 0; i < rowCount; i++) {
- resultCollector.collect(Row.join(forwardedInputQueue.poll(), arrowSerializer.read(i)));
- }
- arrowSerializer.resetReader();
- }
-
- @Override
- public String getInputOutputCoderUrn() {
- return SCHEMA_ARROW_CODER_URN;
- }
-
- @Override
- public void processElementInternal(Row value) throws Exception {
- arrowSerializer.write(getFunctionInput(value));
- currentBatchCount++;
- if (currentBatchCount >= maxArrowBatchSize) {
- invokeCurrentBatch();
- }
- }
-
- @Override
- protected void invokeFinishBundle() throws Exception {
- invokeCurrentBatch();
- super.invokeFinishBundle();
- }
-
- private void invokeCurrentBatch() throws Exception {
- if (currentBatchCount > 0) {
- arrowSerializer.finishCurrentBatch();
- currentBatchCount = 0;
- pythonFunctionRunner.process(baos.toByteArray());
- checkInvokeFinishBundleByCount();
- baos.reset();
- arrowSerializer.resetWriter();
- }
- }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/AbstractRowPythonScalarFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/AbstractRowPythonScalarFunctionOperator.java
deleted file mode 100644
index 507f6f9..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/AbstractRowPythonScalarFunctionOperator.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.operators.python.scalar;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.table.functions.python.PythonFunctionInfo;
-import org.apache.flink.table.runtime.operators.python.utils.StreamRecordCRowWrappingCollector;
-import org.apache.flink.table.runtime.types.CRow;
-import org.apache.flink.table.runtime.types.CRowTypeInfo;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.utils.TypeConversions;
-import org.apache.flink.types.Row;
-
-import java.util.Arrays;
-
-/** Base Python {@link ScalarFunction} operator for the legacy planner. */
-@Internal
-public abstract class AbstractRowPythonScalarFunctionOperator
- extends AbstractPythonScalarFunctionOperator<CRow, CRow, Row> {
-
- private static final long serialVersionUID = 1L;
-
- /** The collector used to collect records. */
- protected transient StreamRecordCRowWrappingCollector cRowWrapper;
-
- /** The type serializer for the forwarded fields. */
- private transient TypeSerializer<CRow> forwardedInputSerializer;
-
- public AbstractRowPythonScalarFunctionOperator(
- Configuration config,
- PythonFunctionInfo[] scalarFunctions,
- RowType inputType,
- RowType outputType,
- int[] udfInputOffsets,
- int[] forwardedFields) {
- super(config, scalarFunctions, inputType, outputType, udfInputOffsets, forwardedFields);
- }
-
- @Override
- public void open() throws Exception {
- super.open();
- this.cRowWrapper = new StreamRecordCRowWrappingCollector(output);
-
- CRowTypeInfo forwardedInputTypeInfo =
- new CRowTypeInfo(
- new RowTypeInfo(
- Arrays.stream(forwardedFields)
- .mapToObj(i -> inputType.getFields().get(i))
- .map(RowType.RowField::getType)
- .map(TypeConversions::fromLogicalToDataType)
- .map(TypeConversions::fromDataTypeToLegacyInfo)
- .toArray(TypeInformation[]::new)));
- forwardedInputSerializer = forwardedInputTypeInfo.createSerializer(getExecutionConfig());
- }
-
- @Override
- public void bufferInput(CRow input) {
- CRow forwardedFieldsRow =
- new CRow(Row.project(input.row(), forwardedFields), input.change());
- if (getExecutionConfig().isObjectReuseEnabled()) {
- forwardedFieldsRow = forwardedInputSerializer.copy(forwardedFieldsRow);
- }
- forwardedInputQueue.add(forwardedFieldsRow);
- }
-
- @Override
- public Row getFunctionInput(CRow element) {
- return Row.project(element.row(), userDefinedFunctionInputOffsets);
- }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperator.java
deleted file mode 100644
index b458850..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperator.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.operators.python.scalar;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.table.functions.python.PythonFunctionInfo;
-import org.apache.flink.table.runtime.types.CRow;
-import org.apache.flink.table.runtime.typeutils.PythonTypeUtils;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.Row;
-
-/** The Python {@link ScalarFunction} operator for the legacy planner. */
-@Internal
-public class PythonScalarFunctionOperator extends AbstractRowPythonScalarFunctionOperator {
-
- private static final long serialVersionUID = 1L;
-
- /** The TypeSerializer for udf execution results. */
- private transient TypeSerializer<Row> udfOutputTypeSerializer;
-
- /** The TypeSerializer for udf input elements. */
- private transient TypeSerializer<Row> udfInputTypeSerializer;
-
- public PythonScalarFunctionOperator(
- Configuration config,
- PythonFunctionInfo[] scalarFunctions,
- RowType inputType,
- RowType outputType,
- int[] udfInputOffsets,
- int[] forwardedFields) {
- super(config, scalarFunctions, inputType, outputType, udfInputOffsets, forwardedFields);
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void open() throws Exception {
- super.open();
- udfInputTypeSerializer =
- PythonTypeUtils.toFlinkTypeSerializer(userDefinedFunctionInputType);
- udfOutputTypeSerializer =
- PythonTypeUtils.toFlinkTypeSerializer(userDefinedFunctionOutputType);
- }
-
- @Override
- public void processElementInternal(CRow value) throws Exception {
- udfInputTypeSerializer.serialize(getFunctionInput(value), baosWrapper);
- pythonFunctionRunner.process(baos.toByteArray());
- baos.reset();
- }
-
- @Override
- @SuppressWarnings("ConstantConditions")
- public void emitResult(Tuple2<byte[], Integer> resultTuple) throws Exception {
- byte[] rawUdfResult = resultTuple.f0;
- int length = resultTuple.f1;
- CRow input = forwardedInputQueue.poll();
- cRowWrapper.setChange(input.change());
- bais.setBuffer(rawUdfResult, 0, length);
- Row udfResult = udfOutputTypeSerializer.deserialize(baisWrapper);
- cRowWrapper.collect(Row.join(input.row(), udfResult));
- }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperator.java
deleted file mode 100644
index d3ce04a..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperator.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.operators.python.scalar.arrow;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.table.functions.python.PythonFunctionInfo;
-import org.apache.flink.table.runtime.arrow.serializers.ArrowSerializer;
-import org.apache.flink.table.runtime.arrow.serializers.RowArrowSerializer;
-import org.apache.flink.table.runtime.operators.python.scalar.AbstractRowPythonScalarFunctionOperator;
-import org.apache.flink.table.runtime.types.CRow;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.Row;
-
-/** Arrow Python {@link ScalarFunction} operator for the old planner. */
-@Internal
-public class ArrowPythonScalarFunctionOperator extends AbstractRowPythonScalarFunctionOperator {
-
- private static final long serialVersionUID = 1L;
-
- private static final String SCHEMA_ARROW_CODER_URN = "flink:coder:schema:arrow:v1";
-
- /** The current number of elements to be included in an arrow batch. */
- private transient int currentBatchCount;
-
- /** Max number of elements to include in an arrow batch. */
- private transient int maxArrowBatchSize;
-
- private transient ArrowSerializer<Row> arrowSerializer;
-
- public ArrowPythonScalarFunctionOperator(
- Configuration config,
- PythonFunctionInfo[] scalarFunctions,
- RowType inputType,
- RowType outputType,
- int[] udfInputOffsets,
- int[] forwardedFields) {
- super(config, scalarFunctions, inputType, outputType, udfInputOffsets, forwardedFields);
- }
-
- @Override
- public void open() throws Exception {
- super.open();
- maxArrowBatchSize = Math.min(getPythonConfig().getMaxArrowBatchSize(), maxBundleSize);
- arrowSerializer =
- new RowArrowSerializer(userDefinedFunctionInputType, userDefinedFunctionOutputType);
- arrowSerializer.open(bais, baos);
- currentBatchCount = 0;
- }
-
- @Override
- protected void invokeFinishBundle() throws Exception {
- invokeCurrentBatch();
- super.invokeFinishBundle();
- }
-
- @Override
- public void dispose() throws Exception {
- super.dispose();
- if (arrowSerializer != null) {
- arrowSerializer.close();
- arrowSerializer = null;
- }
- }
-
- @Override
- public void close() throws Exception {
- invokeCurrentBatch();
- super.close();
- }
-
- @Override
- public void endInput() throws Exception {
- invokeCurrentBatch();
- super.endInput();
- }
-
- @Override
- @SuppressWarnings("ConstantConditions")
- public void emitResult(Tuple2<byte[], Integer> resultTuple) throws Exception {
- byte[] udfResult = resultTuple.f0;
- int length = resultTuple.f1;
- bais.setBuffer(udfResult, 0, length);
- int rowCount = arrowSerializer.load();
- for (int i = 0; i < rowCount; i++) {
- CRow input = forwardedInputQueue.poll();
- cRowWrapper.setChange(input.change());
- cRowWrapper.collect(Row.join(input.row(), arrowSerializer.read(i)));
- }
- arrowSerializer.resetReader();
- }
-
- @Override
- public String getInputOutputCoderUrn() {
- return SCHEMA_ARROW_CODER_URN;
- }
-
- @Override
- public void processElementInternal(CRow value) throws Exception {
- arrowSerializer.write(getFunctionInput(value));
- currentBatchCount++;
- if (currentBatchCount >= maxArrowBatchSize) {
- invokeCurrentBatch();
- }
- }
-
- private void invokeCurrentBatch() throws Exception {
- if (currentBatchCount > 0) {
- arrowSerializer.finishCurrentBatch();
- currentBatchCount = 0;
- pythonFunctionRunner.process(baos.toByteArray());
- checkInvokeFinishBundleByCount();
- baos.reset();
- arrowSerializer.resetWriter();
- }
- }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java
deleted file mode 100644
index d6282dc..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.operators.python.table;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.functions.TableFunction;
-import org.apache.flink.table.functions.python.PythonFunctionInfo;
-import org.apache.flink.table.planner.plan.utils.JoinTypeUtil;
-import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
-import org.apache.flink.table.runtime.operators.python.utils.StreamRecordCRowWrappingCollector;
-import org.apache.flink.table.runtime.types.CRow;
-import org.apache.flink.table.runtime.types.CRowTypeInfo;
-import org.apache.flink.table.runtime.typeutils.PythonTypeUtils;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.utils.TypeConversions;
-import org.apache.flink.types.Row;
-
-import org.apache.calcite.rel.core.JoinRelType;
-
-/** The Python {@link TableFunction} operator for the legacy planner. */
-@Internal
-public class PythonTableFunctionOperator
- extends AbstractPythonTableFunctionOperator<CRow, CRow, Row> {
-
- private static final long serialVersionUID = 1L;
-
- /** The collector used to collect records. */
- private transient StreamRecordCRowWrappingCollector cRowWrapper;
-
- /** The type serializer for the forwarded fields. */
- private transient TypeSerializer<CRow> forwardedInputSerializer;
-
- /** The TypeSerializer for udtf execution results. */
- private transient TypeSerializer<Row> udtfOutputTypeSerializer;
-
- /** The TypeSerializer for udtf input elements. */
- private transient TypeSerializer<Row> udtfInputTypeSerializer;
-
- public PythonTableFunctionOperator(
- Configuration config,
- PythonFunctionInfo tableFunction,
- RowType inputType,
- RowType outputType,
- int[] udtfInputOffsets,
- JoinRelType joinType) {
- super(
- config,
- tableFunction,
- inputType,
- outputType,
- udtfInputOffsets,
- JoinTypeUtil.getFlinkJoinType(joinType));
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void open() throws Exception {
- super.open();
- this.cRowWrapper = new StreamRecordCRowWrappingCollector(output);
- CRowTypeInfo forwardedInputTypeInfo =
- new CRowTypeInfo(
- (RowTypeInfo)
- TypeConversions.fromDataTypeToLegacyInfo(
- TypeConversions.fromLogicalToDataType(inputType)));
- forwardedInputSerializer = forwardedInputTypeInfo.createSerializer(getExecutionConfig());
- udtfOutputTypeSerializer =
- PythonTypeUtils.toFlinkTypeSerializer(userDefinedFunctionOutputType);
- udtfInputTypeSerializer =
- PythonTypeUtils.toFlinkTypeSerializer(userDefinedFunctionInputType);
- }
-
- @Override
- @SuppressWarnings("ConstantConditions")
- public void emitResult(Tuple2<byte[], Integer> resultTuple) throws Exception {
- CRow input = forwardedInputQueue.poll();
- byte[] rawUdtfResult;
- int length;
- boolean isFinishResult;
- boolean hasJoined = false;
- Row udtfResult;
- do {
- rawUdtfResult = resultTuple.f0;
- length = resultTuple.f1;
- isFinishResult = isFinishResult(rawUdtfResult, length);
- if (!isFinishResult) {
- bais.setBuffer(rawUdtfResult, 0, length);
- udtfResult = udtfOutputTypeSerializer.deserialize(baisWrapper);
- cRowWrapper.setChange(input.change());
- cRowWrapper.collect(Row.join(input.row(), udtfResult));
- resultTuple = pythonFunctionRunner.pollResult();
- hasJoined = true;
- } else if (joinType == FlinkJoinType.LEFT && !hasJoined) {
- udtfResult = new Row(userDefinedFunctionOutputType.getFieldCount());
- for (int i = 0; i < udtfResult.getArity(); i++) {
- udtfResult.setField(0, null);
- }
- cRowWrapper.setChange(input.change());
- cRowWrapper.collect(Row.join(input.row(), udtfResult));
- }
- } while (!isFinishResult);
- }
-
- @Override
- public void bufferInput(CRow input) {
- if (getExecutionConfig().isObjectReuseEnabled()) {
- input = forwardedInputSerializer.copy(input);
- }
- forwardedInputQueue.add(input);
- }
-
- @Override
- public Row getFunctionInput(CRow element) {
- return Row.project(element.row(), userDefinedFunctionInputOffsets);
- }
-
- @Override
- public void processElementInternal(CRow value) throws Exception {
- udtfInputTypeSerializer.serialize(getFunctionInput(value), baosWrapper);
- pythonFunctionRunner.process(baos.toByteArray());
- baos.reset();
- }
-}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/utils/StreamRecordCRowWrappingCollector.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/utils/StreamRecordCRowWrappingCollector.java
deleted file mode 100644
index 502de8e..0000000
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/utils/StreamRecordCRowWrappingCollector.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.operators.python.utils;
-
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.table.runtime.types.CRow;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Collector;
-
-/** The collector is used to convert a {@link Row} to a {@link CRow}. */
-public class StreamRecordCRowWrappingCollector implements Collector<Row> {
-
- private final Collector<StreamRecord<CRow>> out;
- private final CRow reuseCRow = new CRow();
-
- /** For Table API & SQL jobs, the timestamp field is not used. */
- private final StreamRecord<CRow> reuseStreamRecord = new StreamRecord<>(reuseCRow);
-
- public StreamRecordCRowWrappingCollector(Collector<StreamRecord<CRow>> out) {
- this.out = out;
- }
-
- public void setChange(boolean change) {
- this.reuseCRow.change_$eq(change);
- }
-
- @Override
- public void collect(Row record) {
- reuseCRow.row_$eq(record);
- out.collect(reuseStreamRecord);
- }
-
- @Override
- public void close() {
- out.close();
- }
-}
diff --git a/flink-python/src/test/java/org/apache/flink/client/python/PythonFunctionFactoryTest.java b/flink-python/src/test/java/org/apache/flink/client/python/PythonFunctionFactoryTest.java
index c8824fe..7676bfc 100644
--- a/flink-python/src/test/java/org/apache/flink/client/python/PythonFunctionFactoryTest.java
+++ b/flink-python/src/test/java/org/apache/flink/client/python/PythonFunctionFactoryTest.java
@@ -17,12 +17,10 @@
package org.apache.flink.client.python;
-import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.util.FileUtils;
@@ -45,7 +43,6 @@ import static org.apache.flink.table.api.Expressions.call;
public class PythonFunctionFactoryTest {
private static String tmpdir = "";
- private static BatchTableEnvironment flinkTableEnv;
private static StreamTableEnvironment blinkTableEnv;
private static Table flinkSourceTable;
private static Table blinkSourceTable;
@@ -72,13 +69,6 @@ public class PythonFunctionFactoryTest {
+ " return str + str\n";
out.write(code.getBytes());
}
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- flinkTableEnv = BatchTableEnvironment.create(env);
- flinkTableEnv
- .getConfig()
- .getConfiguration()
- .set(PYTHON_FILES, pyFilePath.getAbsolutePath());
- flinkTableEnv.getConfig().getConfiguration().setString(TASK_OFF_HEAP_MEMORY.key(), "80mb");
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
blinkTableEnv =
StreamTableEnvironment.create(
@@ -92,7 +82,6 @@ public class PythonFunctionFactoryTest {
.getConfiguration()
.set(PYTHON_FILES, pyFilePath.getAbsolutePath());
blinkTableEnv.getConfig().getConfiguration().setString(TASK_OFF_HEAP_MEMORY.key(), "80mb");
- flinkSourceTable = flinkTableEnv.fromDataSet(env.fromElements("1", "2", "3")).as("str");
blinkSourceTable = blinkTableEnv.fromDataStream(sEnv.fromElements("1", "2", "3")).as("str");
}
@@ -102,24 +91,6 @@ public class PythonFunctionFactoryTest {
}
public static void testPythonFunctionFactory() {
- // flink catalog
- flinkTableEnv.executeSql("create function func1 as 'test1.func1' language python");
- verifyPlan(flinkSourceTable.select(call("func1", $("str"))), flinkTableEnv);
-
- // flink catalog
- flinkTableEnv.executeSql("alter function func1 as 'test1.func1' language python");
- verifyPlan(flinkSourceTable.select(call("func1", $("str"))), flinkTableEnv);
-
- // flink temporary catalog
- flinkTableEnv.executeSql(
- "create temporary function func1 as 'test1.func1' language python");
- verifyPlan(flinkSourceTable.select(call("func1", $("str"))), flinkTableEnv);
-
- // flink temporary system
- flinkTableEnv.executeSql(
- "create temporary system function func1 as 'test1.func1' language python");
- verifyPlan(flinkSourceTable.select(call("func1", $("str"))), flinkTableEnv);
-
// blink catalog
blinkTableEnv.executeSql("create function func1 as 'test1.func1' language python");
verifyPlan(blinkSourceTable.select(call("func1", $("str"))), blinkTableEnv);
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java
deleted file mode 100644
index 6f4cd18..0000000
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.operators.python.scalar;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.python.PythonFunctionRunner;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.table.functions.python.PythonFunctionInfo;
-import org.apache.flink.table.runtime.types.CRow;
-import org.apache.flink.table.runtime.utils.PassThroughPythonScalarFunctionRunner;
-import org.apache.flink.table.runtime.utils.PythonTestUtils;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.Row;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Queue;
-
-/** Tests for {@link PythonScalarFunctionOperator}. */
-public class PythonScalarFunctionOperatorTest
- extends PythonScalarFunctionOperatorTestBase<CRow, CRow, Row> {
-
- @Override
- public AbstractPythonScalarFunctionOperator<CRow, CRow, Row> getTestOperator(
- Configuration config,
- PythonFunctionInfo[] scalarFunctions,
- RowType inputType,
- RowType outputType,
- int[] udfInputOffsets,
- int[] forwardedFields) {
- return new PassThroughPythonScalarFunctionOperator(
- config, scalarFunctions, inputType, outputType, udfInputOffsets, forwardedFields);
- }
-
- @Override
- public CRow newRow(boolean accumulateMsg, Object... fields) {
- return new CRow(Row.of(fields), accumulateMsg);
- }
-
- @Override
- public void assertOutputEquals(
- String message, Collection<Object> expected, Collection<Object> actual) {
- TestHarnessUtil.assertOutputEquals(
- message, (Queue<Object>) expected, (Queue<Object>) actual);
- }
-
- @Override
- public StreamTableEnvironment createTableEnvironment(StreamExecutionEnvironment env) {
- return StreamTableEnvironment.create(env);
- }
-
- @Override
- public TypeSerializer<CRow> getOutputTypeSerializer(RowType dataType) {
- // If set to null, PojoSerializer is used by default which works well here.
- return null;
- }
-
- private static class PassThroughPythonScalarFunctionOperator
- extends PythonScalarFunctionOperator {
-
- PassThroughPythonScalarFunctionOperator(
- Configuration config,
- PythonFunctionInfo[] scalarFunctions,
- RowType inputType,
- RowType outputType,
- int[] udfInputOffsets,
- int[] forwardedFields) {
- super(config, scalarFunctions, inputType, outputType, udfInputOffsets, forwardedFields);
- }
-
- @Override
- public PythonFunctionRunner createPythonFunctionRunner() throws IOException {
- return new PassThroughPythonScalarFunctionRunner(
- getRuntimeContext().getTaskName(),
- PythonTestUtils.createTestEnvironmentManager(),
- userDefinedFunctionInputType,
- userDefinedFunctionOutputType,
- getFunctionUrn(),
- getUserDefinedFunctionsProto(),
- getInputOutputCoderUrn(),
- new HashMap<>(),
- PythonTestUtils.createMockFlinkMetricContainer());
- }
- }
-}
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java
deleted file mode 100644
index 6274a83..0000000
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.operators.python.scalar.arrow;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.python.PythonFunctionRunner;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.table.functions.python.PythonFunctionInfo;
-import org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator;
-import org.apache.flink.table.runtime.operators.python.scalar.PythonScalarFunctionOperatorTestBase;
-import org.apache.flink.table.runtime.types.CRow;
-import org.apache.flink.table.runtime.utils.PassThroughPythonScalarFunctionRunner;
-import org.apache.flink.table.runtime.utils.PythonTestUtils;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.Row;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Queue;
-
-/** Tests for {@link ArrowPythonScalarFunctionOperator}. */
-public class ArrowPythonScalarFunctionOperatorTest
- extends PythonScalarFunctionOperatorTestBase<CRow, CRow, Row> {
-
- public AbstractPythonScalarFunctionOperator<CRow, CRow, Row> getTestOperator(
- Configuration config,
- PythonFunctionInfo[] scalarFunctions,
- RowType inputType,
- RowType outputType,
- int[] udfInputOffsets,
- int[] forwardedFields) {
- return new PassThroughArrowPythonScalarFunctionOperator(
- config, scalarFunctions, inputType, outputType, udfInputOffsets, forwardedFields);
- }
-
- public CRow newRow(boolean accumulateMsg, Object... fields) {
- return new CRow(Row.of(fields), accumulateMsg);
- }
-
- public void assertOutputEquals(
- String message, Collection<Object> expected, Collection<Object> actual) {
- TestHarnessUtil.assertOutputEquals(
- message, (Queue<Object>) expected, (Queue<Object>) actual);
- }
-
- public StreamTableEnvironment createTableEnvironment(StreamExecutionEnvironment env) {
- return StreamTableEnvironment.create(env);
- }
-
- @Override
- public TypeSerializer<CRow> getOutputTypeSerializer(RowType dataType) {
- // If set to null, PojoSerializer is used by default which works well here.
- return null;
- }
-
- private static class PassThroughArrowPythonScalarFunctionOperator
- extends ArrowPythonScalarFunctionOperator {
-
- PassThroughArrowPythonScalarFunctionOperator(
- Configuration config,
- PythonFunctionInfo[] scalarFunctions,
- RowType inputType,
- RowType outputType,
- int[] udfInputOffsets,
- int[] forwardedFields) {
- super(config, scalarFunctions, inputType, outputType, udfInputOffsets, forwardedFields);
- }
-
- @Override
- public PythonFunctionRunner createPythonFunctionRunner() throws IOException {
- return new PassThroughPythonScalarFunctionRunner(
- getRuntimeContext().getTaskName(),
- PythonTestUtils.createTestEnvironmentManager(),
- userDefinedFunctionInputType,
- userDefinedFunctionOutputType,
- getFunctionUrn(),
- getUserDefinedFunctionsProto(),
- getInputOutputCoderUrn(),
- new HashMap<>(),
- PythonTestUtils.createMockFlinkMetricContainer());
- }
- }
-}
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java
deleted file mode 100644
index 1b3cc01..0000000
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.operators.python.table;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.python.PythonFunctionRunner;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.apache.flink.table.functions.python.PythonFunctionInfo;
-import org.apache.flink.table.runtime.types.CRow;
-import org.apache.flink.table.runtime.utils.PassThroughPythonTableFunctionRunner;
-import org.apache.flink.table.runtime.utils.PythonTestUtils;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.Row;
-
-import org.apache.calcite.rel.core.JoinRelType;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Queue;
-
-/** Tests for {@link PythonTableFunctionOperator}. */
-public class PythonTableFunctionOperatorTest
- extends PythonTableFunctionOperatorTestBase<CRow, CRow, Row> {
- @Override
- public AbstractPythonTableFunctionOperator<CRow, CRow, Row> getTestOperator(
- Configuration config,
- PythonFunctionInfo tableFunction,
- RowType inputType,
- RowType outputType,
- int[] udfInputOffsets,
- JoinRelType joinRelType) {
- return new PassThroughPythonTableFunctionOperator(
- config, tableFunction, inputType, outputType, udfInputOffsets, joinRelType);
- }
-
- @Override
- public CRow newRow(boolean accumulateMsg, Object... fields) {
- return new CRow(Row.of(fields), accumulateMsg);
- }
-
- @Override
- public void assertOutputEquals(
- String message, Collection<Object> expected, Collection<Object> actual) {
- TestHarnessUtil.assertOutputEquals(
- message, (Queue<Object>) expected, (Queue<Object>) actual);
- }
-
- private static class PassThroughPythonTableFunctionOperator
- extends PythonTableFunctionOperator {
-
- PassThroughPythonTableFunctionOperator(
- Configuration config,
- PythonFunctionInfo tableFunction,
- RowType inputType,
- RowType outputType,
- int[] udfInputOffsets,
- JoinRelType joinRelType) {
- super(config, tableFunction, inputType, outputType, udfInputOffsets, joinRelType);
- }
-
- @Override
- public PythonFunctionRunner createPythonFunctionRunner() throws IOException {
- return new PassThroughPythonTableFunctionRunner(
- getRuntimeContext().getTaskName(),
- PythonTestUtils.createTestEnvironmentManager(),
- userDefinedFunctionInputType,
- userDefinedFunctionOutputType,
- getFunctionUrn(),
- getUserDefinedFunctionsProto(),
- getInputOutputCoderUrn(),
- new HashMap<>(),
- PythonTestUtils.createMockFlinkMetricContainer());
- }
- }
-}
diff --git a/flink-python/tox.ini b/flink-python/tox.ini
index ce7d02c..2cc04f6 100644
--- a/flink-python/tox.ini
+++ b/flink-python/tox.ini
@@ -53,7 +53,7 @@ max-line-length=100
exclude=.tox/*,dev/*,lib/*,target/*,build/*,dist/*,pyflink/shell.py,.eggs/*,pyflink/fn_execution/tests/process_mode_test_data.py,pyflink/fn_execution/*_pb2.py
[mypy]
-files=pyflink/common/*.py,pyflink/table/*.py,pyflink/dataset/*.py,pyflink/datastream/*.py,pyflink/metrics/*.py
+files=pyflink/common/*.py,pyflink/table/*.py,pyflink/datastream/*.py,pyflink/metrics/*.py
ignore_missing_imports = True
strict_optional=False