You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/27 09:29:40 UTC

[GitHub] [flink-ml] yunfengzhou-hub opened a new pull request, #121: [FLINK-28237] Add pyflink examples

yunfengzhou-hub opened a new pull request, #121:
URL: https://github.com/apache/flink-ml/pull/121

   This PR is a continuation of #115 . It adds pyflink examples for Flink ML's algorithms.
   
   This PR also does the following:
   - fixes a bug in `IndexToStringModel.java`
   - fixes a bug in `linalg.py`
   - deduplicates test cases in `test_logisticregression.py`
   - renames `tests_binaryclassificationevaluator.py` to `test_binaryclassificationevaluator.py`
   - fixes comments in `stringindexer.py`
   - adds python support for Linear Regression algorithm


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] lindong28 commented on a diff in pull request #121: [FLINK-27715] Add pyflink examples

Posted by GitBox <gi...@apache.org>.
lindong28 commented on code in PR #121:
URL: https://github.com/apache/flink-ml/pull/121#discussion_r908586411


##########
flink-ml-python/pyflink/examples/ml/__init__.py:
##########
@@ -15,3 +15,80 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
+from py4j.java_gateway import JavaClass, get_java_class, JavaObject
+from pyflink.java_gateway import get_gateway
+from pyflink.util import java_utils
+from pyflink.util.java_utils import to_jarray, load_java_class
+
+
+def add_jars_to_context_class_loader(jar_urls):
+    """
+    Add jars to Python gateway server for local compilation and local execution (i.e. minicluster).
+    There are many component in Flink which won't be added to classpath by default. e.g. Kafka
+    connector, JDBC connector, CSV format etc. This utility function can be used to hot load the
+    jars.
+
+    :param jar_urls: The list of jar urls.
+    """
+    gateway = get_gateway()
+    # validate and normalize
+    jar_urls = [gateway.jvm.java.net.URL(url) for url in jar_urls]
+    context_classloader = gateway.jvm.Thread.currentThread().getContextClassLoader()
+    existing_urls = []
+    class_loader_name = context_classloader.getClass().getName()
+    if class_loader_name == "java.net.URLClassLoader":
+        existing_urls = set([url.toString() for url in context_classloader.getURLs()])
+    if all([url.toString() in existing_urls for url in jar_urls]):
+        # if urls all existed, no need to create new class loader.
+        return
+
+    URLClassLoaderClass = load_java_class("java.net.URLClassLoader")
+    if is_instance_of(context_classloader, URLClassLoaderClass):
+        if class_loader_name == "org.apache.flink.runtime.execution.librarycache." \
+                                "FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader":
+            ensureInner = context_classloader.getClass().getDeclaredMethod("ensureInner", None)
+            ensureInner.setAccessible(True)
+            context_classloader = ensureInner.invoke(context_classloader, None)
+
+        addURL = URLClassLoaderClass.getDeclaredMethod(
+            "addURL",
+            to_jarray(
+                gateway.jvm.Class,
+                [load_java_class("java.net.URL")]))
+        addURL.setAccessible(True)
+
+        for url in jar_urls:
+            addURL.invoke(context_classloader, to_jarray(get_gateway().jvm.Object, [url]))
+
+    else:
+        context_classloader = create_url_class_loader(jar_urls, context_classloader)
+        gateway.jvm.Thread.currentThread().setContextClassLoader(context_classloader)
+
+
+def is_instance_of(java_object, java_class):
+    gateway = get_gateway()
+    if isinstance(java_class, str):
+        param = java_class
+    elif isinstance(java_class, JavaClass):
+        param = get_java_class(java_class)
+    elif isinstance(java_class, JavaObject):
+        if not is_instance_of(java_class, gateway.jvm.Class):
+            param = java_class.getClass()
+        else:
+            param = java_class
+    else:
+        raise TypeError(
+            "java_class must be a string, a JavaClass, or a JavaObject")
+
+    return gateway.jvm.org.apache.flink.api.python.shaded.py4j.reflection.TypeUtil.isInstanceOf(
+        param, java_object)
+
+
+def create_url_class_loader(urls, parent_class_loader):
+    gateway = get_gateway()
+    url_class_loader = gateway.jvm.java.net.URLClassLoader(
+        to_jarray(gateway.jvm.java.net.URL, urls), parent_class_loader)
+    return url_class_loader
+
+
+java_utils.add_jars_to_context_class_loader = add_jars_to_context_class_loader

Review Comment:
   Could you explain why we need to do this? For example, is this because we want to get around a bug in `pyflink.util.java_utils` before that bug is fixed in Flink?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] lindong28 commented on pull request #121: [FLINK-27715] Add pyflink examples

Posted by GitBox <gi...@apache.org>.
lindong28 commented on PR #121:
URL: https://github.com/apache/flink-ml/pull/121#issuecomment-1169468374

   Thanks for the update. LGTM.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] lindong28 merged pull request #121: [FLINK-27715] Add pyflink examples

Posted by GitBox <gi...@apache.org>.
lindong28 merged PR #121:
URL: https://github.com/apache/flink-ml/pull/121


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #121: [FLINK-27715] Add pyflink examples

Posted by GitBox <gi...@apache.org>.
yunfengzhou-hub commented on code in PR #121:
URL: https://github.com/apache/flink-ml/pull/121#discussion_r909134520


##########
flink-ml-python/pyflink/examples/ml/__init__.py:
##########
@@ -15,3 +15,80 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
+from py4j.java_gateway import JavaClass, get_java_class, JavaObject
+from pyflink.java_gateway import get_gateway
+from pyflink.util import java_utils
+from pyflink.util.java_utils import to_jarray, load_java_class
+
+
+def add_jars_to_context_class_loader(jar_urls):
+    """
+    Add jars to Python gateway server for local compilation and local execution (i.e. minicluster).
+    There are many component in Flink which won't be added to classpath by default. e.g. Kafka
+    connector, JDBC connector, CSV format etc. This utility function can be used to hot load the
+    jars.
+
+    :param jar_urls: The list of jar urls.
+    """
+    gateway = get_gateway()
+    # validate and normalize
+    jar_urls = [gateway.jvm.java.net.URL(url) for url in jar_urls]
+    context_classloader = gateway.jvm.Thread.currentThread().getContextClassLoader()
+    existing_urls = []
+    class_loader_name = context_classloader.getClass().getName()
+    if class_loader_name == "java.net.URLClassLoader":
+        existing_urls = set([url.toString() for url in context_classloader.getURLs()])
+    if all([url.toString() in existing_urls for url in jar_urls]):
+        # if urls all existed, no need to create new class loader.
+        return
+
+    URLClassLoaderClass = load_java_class("java.net.URLClassLoader")
+    if is_instance_of(context_classloader, URLClassLoaderClass):
+        if class_loader_name == "org.apache.flink.runtime.execution.librarycache." \
+                                "FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader":
+            ensureInner = context_classloader.getClass().getDeclaredMethod("ensureInner", None)
+            ensureInner.setAccessible(True)
+            context_classloader = ensureInner.invoke(context_classloader, None)
+
+        addURL = URLClassLoaderClass.getDeclaredMethod(
+            "addURL",
+            to_jarray(
+                gateway.jvm.Class,
+                [load_java_class("java.net.URL")]))
+        addURL.setAccessible(True)
+
+        for url in jar_urls:
+            addURL.invoke(context_classloader, to_jarray(get_gateway().jvm.Object, [url]))
+
+    else:
+        context_classloader = create_url_class_loader(jar_urls, context_classloader)
+        gateway.jvm.Thread.currentThread().setContextClassLoader(context_classloader)
+
+
+def is_instance_of(java_object, java_class):
+    gateway = get_gateway()
+    if isinstance(java_class, str):
+        param = java_class
+    elif isinstance(java_class, JavaClass):
+        param = get_java_class(java_class)
+    elif isinstance(java_class, JavaObject):
+        if not is_instance_of(java_class, gateway.jvm.Class):
+            param = java_class.getClass()
+        else:
+            param = java_class
+    else:
+        raise TypeError(
+            "java_class must be a string, a JavaClass, or a JavaObject")
+
+    return gateway.jvm.org.apache.flink.api.python.shaded.py4j.reflection.TypeUtil.isInstanceOf(
+        param, java_object)
+
+
+def create_url_class_loader(urls, parent_class_loader):
+    gateway = get_gateway()
+    url_class_loader = gateway.jvm.java.net.URLClassLoader(
+        to_jarray(gateway.jvm.java.net.URL, urls), parent_class_loader)
+    return url_class_loader
+
+
+java_utils.add_jars_to_context_class_loader = add_jars_to_context_class_loader

Review Comment:
   Yes. It is a walk-around to a bug related to FLINK-15635 and FLINK-28002. I'll add a TODO to avoid overwriting `pyflink.util.java_utils` after this bug is fixed and released.
   
   And according to offline discussion, I'll try to reuse the `add_jars_to_context_class_loader` method across `pyflink/ml/__init__.py` and `pyflink/examples/ml/__init__.py`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org