You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2022/11/29 06:54:16 UTC
[flink-ml] branch master updated: [FLINK-30037] Use shared mini-cluster in python tests
This is an automated email from the ASF dual-hosted git repository.
lindong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-ml.git
The following commit(s) were added to refs/heads/master by this push:
new 213e9e3 [FLINK-30037] Use shared mini-cluster in python tests
213e9e3 is described below
commit 213e9e3ae83428473093c52db92bbf3db5d0a4c6
Author: yunfengzhou-hub <yu...@outlook.com>
AuthorDate: Tue Nov 29 14:54:11 2022 +0800
[FLINK-30037] Use shared mini-cluster in python tests
This closes #185.
---
.github/workflows/python-checks.yml | 2 +-
flink-ml-python/pom.xml | 92 ++++++++++++++++++++++++++
flink-ml-python/pyflink/ml/tests/test_utils.py | 41 ++++++++++--
pom.xml | 1 +
4 files changed, 131 insertions(+), 5 deletions(-)
diff --git a/.github/workflows/python-checks.yml b/.github/workflows/python-checks.yml
index 7f41ac8..2d8f60f 100644
--- a/.github/workflows/python-checks.yml
+++ b/.github/workflows/python-checks.yml
@@ -25,7 +25,7 @@ on:
jobs:
tests:
- name: python tests on ${{ matrix.os }}
+ name: python ${{ matrix.python-version }} tests on ${{ matrix.os }}
runs-on: ${{ matrix.os }}
strategy:
matrix:
diff --git a/flink-ml-python/pom.xml b/flink-ml-python/pom.xml
new file mode 100644
index 0000000..316ec40
--- /dev/null
+++ b/flink-ml-python/pom.xml
@@ -0,0 +1,92 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>flink-ml-parent</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>2.2-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>flink-ml-python</artifactId>
+
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime</artifactId>
+ <version>${flink.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>copy</goal>
+ </goals>
+ <configuration>
+ <artifactItems>
+ <artifactItem>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime</artifactId>
+ <!-- Don't use test-jar type because of a bug in the plugin (MDEP-587). -->
+ <classifier>tests</classifier>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils</artifactId>
+ </artifactItem>
+ </artifactItems>
+ <outputDirectory>${project.build.directory}/test-dependencies</outputDirectory>
+ </configuration>
+ </execution>
+ <execution>
+ <id>copy-dependencies</id>
+ <phase>package</phase>
+ <goals>
+ <goal>copy-dependencies</goal>
+ </goals>
+ <configuration>
+ <includeGroupIds>junit</includeGroupIds>
+ <outputDirectory>${project.build.directory}/test-dependencies</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/flink-ml-python/pyflink/ml/tests/test_utils.py b/flink-ml-python/pyflink/ml/tests/test_utils.py
index 68a43dc..bbfb82f 100644
--- a/flink-ml-python/pyflink/ml/tests/test_utils.py
+++ b/flink-ml-python/pyflink/ml/tests/test_utils.py
@@ -22,9 +22,11 @@ import tempfile
import unittest
import uuid
+from py4j.java_gateway import JavaObject
from pyflink.common import RestartStrategies, Configuration
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.java_gateway import get_gateway
+from pyflink.ml import add_jars_to_context_class_loader
from pyflink.table import StreamTableEnvironment
from pyflink.util.java_utils import get_j_env_configuration
@@ -37,10 +39,30 @@ def update_existing_params(target: JavaWithParams, source: JavaWithParams):
class PyFlinkMLTestCase(unittest.TestCase):
+ resource: JavaObject
+
+ @classmethod
+ def setUpClass(cls) -> None:
+ cls._load_dependency_jars()
+
+ gateway = get_gateway()
+ mini_cluster_resource_configuration = (
+ gateway.jvm.org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
+ .Builder()
+ .setNumberTaskManagers(4)
+ .setNumberSlotsPerTaskManager(1)
+ .setRpcServiceSharing(
+ get_gateway().jvm.org.apache.flink.runtime.minicluster.RpcServiceSharing.DEDICATED)
+ .withHaLeadershipControl()
+ .build())
+ cls.resource = (
+ get_gateway().jvm.org.apache.flink.test.util.
+ MiniClusterWithClientResource(mini_cluster_resource_configuration))
+ cls.resource.before()
+
def setUp(self):
self.env = StreamExecutionEnvironment.get_execution_environment()
self.env.get_config().enable_object_reuse()
- self._load_dependency_jars()
config = Configuration(
j_configuration=get_j_env_configuration(self.env._j_stream_execution_environment))
config.set_boolean("execution.checkpointing.checkpoints-after-tasks-finish.enabled", True)
@@ -55,7 +77,12 @@ class PyFlinkMLTestCase(unittest.TestCase):
def tearDown(self) -> None:
shutil.rmtree(self.temp_dir, ignore_errors=True)
- def _load_dependency_jars(self):
+ @classmethod
+ def tearDownClass(cls) -> None:
+ cls.resource.after()
+
+ @classmethod
+ def _load_dependency_jars(cls):
from pyflink.ml.version import __version__
flink_version = __version__.replace(".dev0", "-SNAPSHOT")
@@ -71,7 +98,7 @@ class PyFlinkMLTestCase(unittest.TestCase):
for file in os.listdir(FLINK_ML_LIB_PATH):
if file.endswith('.jar'):
- self.env.add_classpaths("file://{0}/{1}".format(FLINK_ML_LIB_PATH, file))
+ add_jars_to_context_class_loader(["file://{0}/{1}".format(FLINK_ML_LIB_PATH, file)])
# load flink-ml-lib/flink-ml-lib-*-tests.jar
FLINK_ML_LIB_SOURCE_PATH = os.path.abspath(os.path.join(
@@ -80,7 +107,13 @@ class PyFlinkMLTestCase(unittest.TestCase):
ml_test_jar = glob.glob(os.path.join(
FLINK_ML_LIB_SOURCE_PATH, "target", "flink-ml-lib-*-tests.jar"))[0]
- self.env.add_classpaths("file://{0}".format(ml_test_jar))
+ add_jars_to_context_class_loader(["file://{0}".format(ml_test_jar)])
+
+ # load test utility jars.
+ test_util_jars = glob.glob(os.path.join(
+ this_directory, "../../../target/test-dependencies", "*.jar"))
+
+ add_jars_to_context_class_loader(["file://{0}".format(x) for x in test_util_jars])
def save_and_reload(self, stage):
path = os.path.join(self.temp_dir, 'test_save_and_reload', str(uuid.uuid1()))
diff --git a/pom.xml b/pom.xml
index 6219143..7d15c15 100644
--- a/pom.xml
+++ b/pom.xml
@@ -60,6 +60,7 @@ under the License.
<module>flink-ml-benchmark</module>
<module>flink-ml-dist</module>
<module>flink-ml-examples</module>
+ <module>flink-ml-python</module>
</modules>
<properties>