You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by hx...@apache.org on 2022/06/27 09:10:01 UTC

[flink-ml] branch release-2.1 updated (c116000 -> 293f439)

This is an automated email from the ASF dual-hosted git repository.

hxb pushed a change to branch release-2.1
in repository https://gitbox.apache.org/repos/asf/flink-ml.git


    from c116000  [hotfix] Update docs/config.toml
     new 6fed373  [FLINK-28237][python][ml] Improve flink ml python examples in doc
     new 293f439  [FLINK-28237][python][ml] Fix the package error in flink ml python

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/content/docs/operators/classification/knn.md  |  1 +
 .../operators/classification/logisticregression.md |  1 +
 .../docs/operators/classification/naivebayes.md    |  1 +
 docs/content/docs/operators/clustering/kmeans.md   |  1 +
 .../docs/operators/feature/onehotencoder.md        |  1 +
 flink-ml-python/pyflink/ml/__init__.py             | 61 +++++++++++++++++-----
 flink-ml-python/setup.py                           |  5 ++
 7 files changed, 57 insertions(+), 14 deletions(-)


[flink-ml] 02/02: [FLINK-28237][python][ml] Fix the package error in flink ml python

Posted by hx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch release-2.1
in repository https://gitbox.apache.org/repos/asf/flink-ml.git

commit 293f4399cc6acc7f51ff052af79caef0a792ef15
Author: huangxingbo <hx...@apache.org>
AuthorDate: Fri Jun 24 11:43:25 2022 +0800

    [FLINK-28237][python][ml] Fix the package error in flink ml python
---
 flink-ml-python/pyflink/ml/__init__.py | 61 ++++++++++++++++++++++++++--------
 flink-ml-python/setup.py               |  5 +++
 2 files changed, 52 insertions(+), 14 deletions(-)

diff --git a/flink-ml-python/pyflink/ml/__init__.py b/flink-ml-python/pyflink/ml/__init__.py
index 07037b4..5bcf4b1 100644
--- a/flink-ml-python/pyflink/ml/__init__.py
+++ b/flink-ml-python/pyflink/ml/__init__.py
@@ -15,6 +15,7 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
+from py4j.java_gateway import JavaClass, get_java_class, JavaObject
 from pyflink.java_gateway import get_gateway
 from pyflink.util import java_utils
 from pyflink.util.java_utils import to_jarray, load_java_class
@@ -40,22 +41,54 @@ def add_jars_to_context_class_loader(jar_urls):
     if all([url.toString() in existing_urls for url in jar_urls]):
         # if urls all existed, no need to create new class loader.
         return
+
     URLClassLoaderClass = load_java_class("java.net.URLClassLoader")
-    addURL = URLClassLoaderClass.getDeclaredMethod(
-        "addURL",
-        to_jarray(
-            gateway.jvm.Class,
-            [load_java_class("java.net.URL")]))
-    addURL.setAccessible(True)
-    if class_loader_name == "org.apache.flink.runtime.execution.librarycache." \
-                            "FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader":
-        ensureInner = context_classloader.getClass().getDeclaredMethod("ensureInner", None)
-        ensureInner.setAccessible(True)
-        loader = ensureInner.invoke(context_classloader, None)
+    if is_instance_of(context_classloader, URLClassLoaderClass):
+        if class_loader_name == "org.apache.flink.runtime.execution.librarycache." \
+                                "FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader":
+            ensureInner = context_classloader.getClass().getDeclaredMethod("ensureInner", None)
+            ensureInner.setAccessible(True)
+            context_classloader = ensureInner.invoke(context_classloader, None)
+
+        addURL = URLClassLoaderClass.getDeclaredMethod(
+            "addURL",
+            to_jarray(
+                gateway.jvm.Class,
+                [load_java_class("java.net.URL")]))
+        addURL.setAccessible(True)
+
+        for url in jar_urls:
+            addURL.invoke(context_classloader, to_jarray(get_gateway().jvm.Object, [url]))
+
+    else:
+        context_classloader = create_url_class_loader(jar_urls, context_classloader)
+        gateway.jvm.Thread.currentThread().setContextClassLoader(context_classloader)
+
+
+def is_instance_of(java_object, java_class):
+    gateway = get_gateway()
+    if isinstance(java_class, str):
+        param = java_class
+    elif isinstance(java_class, JavaClass):
+        param = get_java_class(java_class)
+    elif isinstance(java_class, JavaObject):
+        if not is_instance_of(java_class, gateway.jvm.Class):
+            param = java_class.getClass()
+        else:
+            param = java_class
     else:
-        loader = context_classloader
-    for url in jar_urls:
-        addURL.invoke(loader, to_jarray(get_gateway().jvm.Object, [url]))
+        raise TypeError(
+            "java_class must be a string, a JavaClass, or a JavaObject")
+
+    return gateway.jvm.org.apache.flink.api.python.shaded.py4j.reflection.TypeUtil.isInstanceOf(
+        param, java_object)
+
+
+def create_url_class_loader(urls, parent_class_loader):
+    gateway = get_gateway()
+    url_class_loader = gateway.jvm.java.net.URLClassLoader(
+        to_jarray(gateway.jvm.java.net.URL, urls), parent_class_loader)
+    return url_class_loader
 
 
 java_utils.add_jars_to_context_class_loader = add_jars_to_context_class_loader
diff --git a/flink-ml-python/setup.py b/flink-ml-python/setup.py
index 55b617e..e584904 100644
--- a/flink-ml-python/setup.py
+++ b/flink-ml-python/setup.py
@@ -82,6 +82,11 @@ try:
                 'pyflink.ml',
                 'pyflink.ml.core',
                 'pyflink.ml.lib',
+                'pyflink.ml.lib.classification',
+                'pyflink.ml.lib.clustering',
+                'pyflink.ml.lib.evaluation',
+                'pyflink.ml.lib.feature',
+                'pyflink.ml.lib',
                 'pyflink.ml.util',
                 'pyflink.examples']
 


[flink-ml] 01/02: [FLINK-28237][python][ml] Improve flink ml python examples in doc

Posted by hx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch release-2.1
in repository https://gitbox.apache.org/repos/asf/flink-ml.git

commit 6fed373d4e5eedbd922b9a58843691ea1e5d41ec
Author: huangxingbo <hx...@apache.org>
AuthorDate: Fri Jun 24 11:42:35 2022 +0800

    [FLINK-28237][python][ml] Improve flink ml python examples in doc
    
    This closes #116.
---
 docs/content/docs/operators/classification/knn.md                | 1 +
 docs/content/docs/operators/classification/logisticregression.md | 1 +
 docs/content/docs/operators/classification/naivebayes.md         | 1 +
 docs/content/docs/operators/clustering/kmeans.md                 | 1 +
 docs/content/docs/operators/feature/onehotencoder.md             | 1 +
 5 files changed, 5 insertions(+)

diff --git a/docs/content/docs/operators/classification/knn.md b/docs/content/docs/operators/classification/knn.md
index d1aab59..289b902 100644
--- a/docs/content/docs/operators/classification/knn.md
+++ b/docs/content/docs/operators/classification/knn.md
@@ -121,6 +121,7 @@ output.execute().print();
 {{< tab "Python">}}
 ```python
 from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
 from pyflink.table import StreamTableEnvironment
 
 from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
diff --git a/docs/content/docs/operators/classification/logisticregression.md b/docs/content/docs/operators/classification/logisticregression.md
index bbf112e..5a8b119 100644
--- a/docs/content/docs/operators/classification/logisticregression.md
+++ b/docs/content/docs/operators/classification/logisticregression.md
@@ -112,6 +112,7 @@ output.execute().print();
 {{< tab "Python">}}
 ```python
 from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
 from pyflink.table import StreamTableEnvironment
 
 from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
diff --git a/docs/content/docs/operators/classification/naivebayes.md b/docs/content/docs/operators/classification/naivebayes.md
index 86f4306..6d50c19 100644
--- a/docs/content/docs/operators/classification/naivebayes.md
+++ b/docs/content/docs/operators/classification/naivebayes.md
@@ -104,6 +104,7 @@ outputTable.execute().print();
 {{< tab "Python">}}
 ```python
 from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
 from pyflink.table import StreamTableEnvironment
 
 from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
diff --git a/docs/content/docs/operators/clustering/kmeans.md b/docs/content/docs/operators/clustering/kmeans.md
index 1e56b2f..8ad3560 100644
--- a/docs/content/docs/operators/clustering/kmeans.md
+++ b/docs/content/docs/operators/clustering/kmeans.md
@@ -104,6 +104,7 @@ for (CloseableIterator<Row> it = output.execute().collect(); it.hasNext(); ) {
 {{< tab "Python">}}
 ```python
 from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
 from pyflink.table import StreamTableEnvironment
 
 from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
diff --git a/docs/content/docs/operators/feature/onehotencoder.md b/docs/content/docs/operators/feature/onehotencoder.md
index 1ee44c9..c0a733e 100644
--- a/docs/content/docs/operators/feature/onehotencoder.md
+++ b/docs/content/docs/operators/feature/onehotencoder.md
@@ -82,6 +82,7 @@ outputTable.execute().print();
 {{< tab "Python">}}
 ```python
 from pyflink.common import Types, Row
+from pyflink.datastream import StreamExecutionEnvironment
 from pyflink.table import StreamTableEnvironment, Table, DataTypes
 
 from pyflink.ml.lib.feature.onehotencoder import OneHotEncoder, OneHotEncoderModel