You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2022/11/29 06:54:16 UTC

[flink-ml] branch master updated: [FLINK-30037] Use shared mini-cluster in python tests

This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-ml.git


The following commit(s) were added to refs/heads/master by this push:
     new 213e9e3  [FLINK-30037] Use shared mini-cluster in python tests
213e9e3 is described below

commit 213e9e3ae83428473093c52db92bbf3db5d0a4c6
Author: yunfengzhou-hub <yu...@outlook.com>
AuthorDate: Tue Nov 29 14:54:11 2022 +0800

    [FLINK-30037] Use shared mini-cluster in python tests
    
    This closes #185.
---
 .github/workflows/python-checks.yml            |  2 +-
 flink-ml-python/pom.xml                        | 92 ++++++++++++++++++++++++++
 flink-ml-python/pyflink/ml/tests/test_utils.py | 41 ++++++++++--
 pom.xml                                        |  1 +
 4 files changed, 131 insertions(+), 5 deletions(-)

diff --git a/.github/workflows/python-checks.yml b/.github/workflows/python-checks.yml
index 7f41ac8..2d8f60f 100644
--- a/.github/workflows/python-checks.yml
+++ b/.github/workflows/python-checks.yml
@@ -25,7 +25,7 @@ on:
 
 jobs:
   tests:
-    name: python tests on ${{ matrix.os }}
+    name: python ${{ matrix.python-version }} tests on ${{ matrix.os }}
     runs-on: ${{ matrix.os }}
     strategy:
       matrix:
diff --git a/flink-ml-python/pom.xml b/flink-ml-python/pom.xml
new file mode 100644
index 0000000..316ec40
--- /dev/null
+++ b/flink-ml-python/pom.xml
@@ -0,0 +1,92 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>flink-ml-parent</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>2.2-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>flink-ml-python</artifactId>
+
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-runtime</artifactId>
+            <version>${flink.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>copy</goal>
+                        </goals>
+                        <configuration>
+                            <artifactItems>
+                                <artifactItem>
+                                    <groupId>org.apache.flink</groupId>
+                                    <artifactId>flink-runtime</artifactId>
+                                    <!-- Don't use test-jar type because of a bug in the plugin (MDEP-587). -->
+                                    <classifier>tests</classifier>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>org.apache.flink</groupId>
+                                    <artifactId>flink-test-utils</artifactId>
+                                </artifactItem>
+                            </artifactItems>
+                            <outputDirectory>${project.build.directory}/test-dependencies</outputDirectory>
+                        </configuration>
+                    </execution>
+                    <execution>
+                        <id>copy-dependencies</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>copy-dependencies</goal>
+                        </goals>
+                        <configuration>
+                            <includeGroupIds>junit</includeGroupIds>
+                            <outputDirectory>${project.build.directory}/test-dependencies</outputDirectory>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/flink-ml-python/pyflink/ml/tests/test_utils.py b/flink-ml-python/pyflink/ml/tests/test_utils.py
index 68a43dc..bbfb82f 100644
--- a/flink-ml-python/pyflink/ml/tests/test_utils.py
+++ b/flink-ml-python/pyflink/ml/tests/test_utils.py
@@ -22,9 +22,11 @@ import tempfile
 import unittest
 import uuid
 
+from py4j.java_gateway import JavaObject
 from pyflink.common import RestartStrategies, Configuration
 from pyflink.datastream import StreamExecutionEnvironment
 from pyflink.java_gateway import get_gateway
+from pyflink.ml import add_jars_to_context_class_loader
 from pyflink.table import StreamTableEnvironment
 from pyflink.util.java_utils import get_j_env_configuration
 
@@ -37,10 +39,30 @@ def update_existing_params(target: JavaWithParams, source: JavaWithParams):
 
 
 class PyFlinkMLTestCase(unittest.TestCase):
+    resource: JavaObject
+
+    @classmethod
+    def setUpClass(cls) -> None:
+        cls._load_dependency_jars()
+
+        gateway = get_gateway()
+        mini_cluster_resource_configuration = (
+            gateway.jvm.org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
+            .Builder()
+            .setNumberTaskManagers(4)
+            .setNumberSlotsPerTaskManager(1)
+            .setRpcServiceSharing(
+                get_gateway().jvm.org.apache.flink.runtime.minicluster.RpcServiceSharing.DEDICATED)
+            .withHaLeadershipControl()
+            .build())
+        cls.resource = (
+            get_gateway().jvm.org.apache.flink.test.util.
+            MiniClusterWithClientResource(mini_cluster_resource_configuration))
+        cls.resource.before()
+
     def setUp(self):
         self.env = StreamExecutionEnvironment.get_execution_environment()
         self.env.get_config().enable_object_reuse()
-        self._load_dependency_jars()
         config = Configuration(
             j_configuration=get_j_env_configuration(self.env._j_stream_execution_environment))
         config.set_boolean("execution.checkpointing.checkpoints-after-tasks-finish.enabled", True)
@@ -55,7 +77,12 @@ class PyFlinkMLTestCase(unittest.TestCase):
     def tearDown(self) -> None:
         shutil.rmtree(self.temp_dir, ignore_errors=True)
 
-    def _load_dependency_jars(self):
+    @classmethod
+    def tearDownClass(cls) -> None:
+        cls.resource.after()
+
+    @classmethod
+    def _load_dependency_jars(cls):
         from pyflink.ml.version import __version__
 
         flink_version = __version__.replace(".dev0", "-SNAPSHOT")
@@ -71,7 +98,7 @@ class PyFlinkMLTestCase(unittest.TestCase):
 
         for file in os.listdir(FLINK_ML_LIB_PATH):
             if file.endswith('.jar'):
-                self.env.add_classpaths("file://{0}/{1}".format(FLINK_ML_LIB_PATH, file))
+                add_jars_to_context_class_loader(["file://{0}/{1}".format(FLINK_ML_LIB_PATH, file)])
 
         # load flink-ml-lib/flink-ml-lib-*-tests.jar
         FLINK_ML_LIB_SOURCE_PATH = os.path.abspath(os.path.join(
@@ -80,7 +107,13 @@ class PyFlinkMLTestCase(unittest.TestCase):
         ml_test_jar = glob.glob(os.path.join(
             FLINK_ML_LIB_SOURCE_PATH, "target", "flink-ml-lib-*-tests.jar"))[0]
 
-        self.env.add_classpaths("file://{0}".format(ml_test_jar))
+        add_jars_to_context_class_loader(["file://{0}".format(ml_test_jar)])
+
+        # load test utility jars.
+        test_util_jars = glob.glob(os.path.join(
+            this_directory, "../../../target/test-dependencies", "*.jar"))
+
+        add_jars_to_context_class_loader(["file://{0}".format(x) for x in test_util_jars])
 
     def save_and_reload(self, stage):
         path = os.path.join(self.temp_dir, 'test_save_and_reload', str(uuid.uuid1()))
diff --git a/pom.xml b/pom.xml
index 6219143..7d15c15 100644
--- a/pom.xml
+++ b/pom.xml
@@ -60,6 +60,7 @@ under the License.
     <module>flink-ml-benchmark</module>
     <module>flink-ml-dist</module>
     <module>flink-ml-examples</module>
+    <module>flink-ml-python</module>
   </modules>
 
   <properties>