You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "WeichenXu123 (via GitHub)" <gi...@apache.org> on 2023/04/26 03:56:52 UTC

[GitHub] [spark] WeichenXu123 opened a new pull request, #40954: [PYSPARK] [CONNECT] [ML] PySpark UDF supports python package dependencies

WeichenXu123 opened a new pull request, #40954:
URL: https://github.com/apache/spark/pull/40954

   ### What changes were proposed in this pull request?
   
   #### Make the pyspark UDF support annotating python dependencies and when executing UDF, the UDF worker creates a new python environment with provided python dependencies.
   
    - Supported spark mode: spark connect model / legacy mode
    - Supported UDF type: All kinds of pyspark UDFs
       - SQL_BATCHED_UDF
       - SQL_SCALAR_PANDAS_UDF
       - SQL_GROUPED_MAP_PANDAS_UDF
       - SQL_GROUPED_AGG_PANDAS_UDF
       - SQL_WINDOW_AGG_PANDAS_UDF
       - SQL_SCALAR_PANDAS_ITER_UDF
       - SQL_MAP_PANDAS_ITER_UDF
       - SQL_COGROUPED_MAP_PANDAS_UDF
       - SQL_MAP_ARROW_ITER_UDF
       - SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE
   
   
   #### Implementation sketch
    - Before starting the pyspark UDF worker process, the python environment with provided python packages is created, and pyspark UDF worker process is spawned using the provided python environment instead of default configured pyspark python. 
    - Using `virtualenv` to create a python environment based on current python environment that pyspark uses, then using `pip install` to install provided python packages.
    - If user configures a NFS directory that is accessible by all spark nodes (readable/writable to spark driver, readable to all spark workers), then it prepares the python environment in driver side, otherwise it creates the python environment in spark worker side.
    - The python environment is cached in spark driver or worker side (depending on NFS directory enabled or not), we uses SHA1 over sorted pip requirements list as python caching key.
   
   #### TODOs
    - In frontend, the PR currently only supports annotating `pip_requirements` for `pandas_udf`, but for other types of UDFs, and for `mapInPandas` / `mapInArrow` the `pip_requirements` argument haven't been added.
    - Supports annotating python version for pyspark UDF, and in UDF execution, downloading python using provided python version and creating python environment using provided python version.
    - Using file lock during python environment creation, to avoid race conditions.
    - Unit tests
   
   ### Why are the changes needed?
   
    - For spark connect case, the client python environment is very likely to be different with pyspark server side python environment, this causes user's UDF function execution failure in pyspark server side.
    - Some machine learning third-party library (e.g. MLflow) requires pyspark UDF supporting  dependencies, because in ML cases, we need to run model inference by pyspark UDF in the exactly the same python environment that trains the model. Currently MLflow supports it by creating a child python process in pyspark UDF worker, and redirecting all UDF input data to the child python process to run model inference, this way it causes significant overhead, if pyspark UDF support builtin python dependency management then we don't need such poorly performing approach.
   
   
   ### Does this PR introduce _any_ user-facing change?
   ```
   
   @pandas_udf("string", pip_requirements=...)
   
   ```
   
   `pip_requirements` argument means either an iterable of pip requirement strings (e.g. ``["scikit-learn", "-r /path/to/req2.txt", "-c /path/to/constraints.txt"]``) or the string path to a pip requirements file path on the local filesystem (e.g. ``"/path/to/requirements.txt"``) represents the pip requirements for the python UDF.
   
   
   ### How was this patch tested?
   
   Unit tests to be added.
   
   Manually tests:
   
   ```
   import pandas as pd
   from pyspark.sql.functions import pandas_udf
   
   sc.setLogLevel("INFO")
   
   @pandas_udf("string", pip_requirements=["PyYAML==6.0"])
   def to_upper(s: pd.Series) -> pd.Series:
       import yaml
       return s.str.upper() + f"yaml-version: {yaml.__version__}"
   
   df = spark.createDataFrame([("John Doe",)], ("name",))
   df.select(to_upper("name")).show(truncate=False)
   ```
   
   Run above code in spark legacy mode or spark connect mode.
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40954: [PYSPARK] [CONNECT] [ML] PySpark UDF supports python package dependencies

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #40954:
URL: https://github.com/apache/spark/pull/40954#discussion_r1184620325


##########
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala:
##########
@@ -160,8 +164,20 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
       envVars.put("PYTHON_FAULTHANDLER_DIR", BasePythonRunner.faultHandlerLogDir.toString)
     }
 
+    // If local property "pythonEnv.allSparkNodesReadableRootEnvDir" is set,
+    // then driver side has initiated a python environment in this directory.
+    // `env.createPythonWorker` can reuse the python environment set up in driver side.
+    val rootPythonEvnDir = Option(
+      context.getLocalProperty("pythonEnv.allSparkNodesReadableRootEnvDir")

Review Comment:
   Can we start the name with `spark`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WeichenXu123 commented on a diff in pull request #40954: [PYSPARK] [CONNECT] [ML] PySpark UDF supports python package dependencies

Posted by "WeichenXu123 (via GitHub)" <gi...@apache.org>.
WeichenXu123 commented on code in PR #40954:
URL: https://github.com/apache/spark/pull/40954#discussion_r1177395283


##########
core/src/main/scala/org/apache/spark/api/python/PythonEnvSetup.scala:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkFiles
+import org.apache.spark.internal.config.{PYSPARK_DRIVER_PYTHON, PYSPARK_PYTHON}
+
+
+
+object PythonEnvSetupUtils {
+
+  def getAllSparkNodesReadablePythonEnvRootDir(): Option[String] = {
+    val sparkContext = SparkContext.getActive.get
+    val sparkConf = sparkContext.conf
+
+    // For databricks runtime,
+    // we need to set `allSparkNodesReadableTempDir` to be REPL temp directory
+    val allSparkNodesReadableTempDir = sparkConf.getOption("spark.nfs.rootDir")
+      .orElse {
+        if (sparkContext.isLocal || sparkContext.master.startsWith("local-cluster[")) {
+          // For local / local-cluster mode spark
+          Some(SparkFiles.getRootDirectory())
+        } else None
+      }
+
+    allSparkNodesReadableTempDir.map { path =>
+      val rootEnvDir = new File(path, "spark-udf-python-env-root")
+      rootEnvDir.mkdirs()
+      rootEnvDir.getPath
+    }
+  }
+
+
+  def setupPythonEnvOnSparkDriverIfAvailable(pipDependencies: Seq[String],
+                                             pipConstraints: Seq[String]): Unit = {
+
+    val allSparkNodesReadablePythonEnvRootDirOpt = getAllSparkNodesReadablePythonEnvRootDir()
+
+    if (allSparkNodesReadablePythonEnvRootDirOpt.isDefined) {
+      // If we have python-env root directory that is accessible to all spark nodes,
+      // we can create python environment with provided dependencies in driver side
+      // and spark exectuor can directly use it.
+      val sparkContext = SparkContext.getActive.get
+      val sparkConf = sparkContext.conf
+
+      sparkContext.setLocalProperty(
+        "pythonEnv.allSparkNodesReadableRootEnvDir",
+        allSparkNodesReadablePythonEnvRootDirOpt.get
+      )
+
+      val pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON)
+        .orElse(sparkConf.get(PYSPARK_PYTHON))
+        .orElse(sys.env.get("PYSPARK_DRIVER_PYTHON"))
+        .orElse(sys.env.get("PYSPARK_PYTHON"))
+        .getOrElse("python3")
+
+      if (!pipDependencies.isEmpty) {
+        PythonEnvManager.getOrCreatePythonEnvironment(
+          pythonExec,
+          allSparkNodesReadablePythonEnvRootDirOpt.get,
+          pipDependencies,
+          pipConstraints
+        )
+      }
+    }
+  }
+
+  def getPipRequirementsFromChainedPythonFuncs(
+                                                funcs: Seq[ChainedPythonFunctions]
+                                              ): (Seq[String], Seq[String]) = {
+    val headPipDeps = funcs.head.funcs.head.pipDependencies
+    val headPipConstraints = funcs.head.funcs.head.pipConstraints
+
+    for (chainedFunc <- funcs) {
+      for (simpleFunc <- chainedFunc.funcs) {
+        if (simpleFunc.pipDependencies != headPipDeps ||
+          simpleFunc.pipConstraints != headPipConstraints
+        ) {
+          // TODO: For this case, we should split current python runner
+          //  into multiple python runners.

Review Comment:
   CC @HyukjinKwon



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] github-actions[bot] commented on pull request #40954: [PYSPARK] [CONNECT] [ML] PySpark UDF supports python package dependencies

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #40954:
URL: https://github.com/apache/spark/pull/40954#issuecomment-1676157353

   We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40954: [PYSPARK] [CONNECT] [ML] PySpark UDF supports python package dependencies

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #40954:
URL: https://github.com/apache/spark/pull/40954#discussion_r1184633342


##########
python/pyspark/pythonenv/pip_req_parser.py:
##########
@@ -0,0 +1,191 @@
+# -*- coding: utf-8 -*-

Review Comment:
   I think we should just rather add another argument, or check if it is a path ending with `.txt` (or check if `requirements.txt`), and let the `pip` decide how to address instead of investing the wheel of how to address pip requirement specification.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WeichenXu123 commented on a diff in pull request #40954: [PYSPARK] [CONNECT] [ML] PySpark UDF supports python package dependencies

Posted by "WeichenXu123 (via GitHub)" <gi...@apache.org>.
WeichenXu123 commented on code in PR #40954:
URL: https://github.com/apache/spark/pull/40954#discussion_r1177559376


##########
core/src/main/scala/org/apache/spark/api/python/PythonEnvSetup.scala:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkFiles
+import org.apache.spark.internal.config.{PYSPARK_DRIVER_PYTHON, PYSPARK_PYTHON}
+
+
+
+object PythonEnvSetupUtils {
+
+  def getAllSparkNodesReadablePythonEnvRootDir(): Option[String] = {
+    val sparkContext = SparkContext.getActive.get
+    val sparkConf = sparkContext.conf
+
+    // For databricks runtime,
+    // we need to set `allSparkNodesReadableTempDir` to be REPL temp directory
+    val allSparkNodesReadableTempDir = sparkConf.getOption("spark.nfs.rootDir")
+      .orElse {
+        if (sparkContext.isLocal || sparkContext.master.startsWith("local-cluster[")) {
+          // For local / local-cluster mode spark
+          Some(SparkFiles.getRootDirectory())
+        } else None
+      }
+
+    allSparkNodesReadableTempDir.map { path =>
+      val rootEnvDir = new File(path, "spark-udf-python-env-root")
+      rootEnvDir.mkdirs()
+      rootEnvDir.getPath
+    }
+  }
+
+
+  def setupPythonEnvOnSparkDriverIfAvailable(pipDependencies: Seq[String],
+                                             pipConstraints: Seq[String]): Unit = {
+
+    val allSparkNodesReadablePythonEnvRootDirOpt = getAllSparkNodesReadablePythonEnvRootDir()
+
+    if (allSparkNodesReadablePythonEnvRootDirOpt.isDefined) {
+      // If we have python-env root directory that is accessible to all spark nodes,
+      // we can create python environment with provided dependencies in driver side
+      // and spark exectuor can directly use it.
+      val sparkContext = SparkContext.getActive.get
+      val sparkConf = sparkContext.conf
+
+      sparkContext.setLocalProperty(
+        "pythonEnv.allSparkNodesReadableRootEnvDir",
+        allSparkNodesReadablePythonEnvRootDirOpt.get
+      )
+
+      val pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON)
+        .orElse(sparkConf.get(PYSPARK_PYTHON))
+        .orElse(sys.env.get("PYSPARK_DRIVER_PYTHON"))
+        .orElse(sys.env.get("PYSPARK_PYTHON"))
+        .getOrElse("python3")
+
+      if (!pipDependencies.isEmpty) {
+        PythonEnvManager.getOrCreatePythonEnvironment(
+          pythonExec,
+          allSparkNodesReadablePythonEnvRootDirOpt.get,
+          pipDependencies,
+          pipConstraints
+        )
+      }
+    }
+  }
+
+  def getPipRequirementsFromChainedPythonFuncs(
+                                                funcs: Seq[ChainedPythonFunctions]
+                                              ): (Seq[String], Seq[String]) = {
+    val headPipDeps = funcs.head.funcs.head.pipDependencies
+    val headPipConstraints = funcs.head.funcs.head.pipConstraints
+
+    for (chainedFunc <- funcs) {
+      for (simpleFunc <- chainedFunc.funcs) {
+        if (simpleFunc.pipDependencies != headPipDeps ||
+          simpleFunc.pipConstraints != headPipConstraints
+        ) {
+          // TODO: For this case, we should split current python runner
+          //  into multiple python runners.

Review Comment:
   Btw, this TODO case is a minor case, usually UDFs in one spark job only requires one kind of python dependency settings.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WeichenXu123 commented on a diff in pull request #40954: [PYSPARK] [CONNECT] [ML] PySpark UDF supports python package dependencies

Posted by "WeichenXu123 (via GitHub)" <gi...@apache.org>.
WeichenXu123 commented on code in PR #40954:
URL: https://github.com/apache/spark/pull/40954#discussion_r1177556859


##########
core/src/main/scala/org/apache/spark/api/python/PythonEnvSetup.scala:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkFiles
+import org.apache.spark.internal.config.{PYSPARK_DRIVER_PYTHON, PYSPARK_PYTHON}
+
+
+
+object PythonEnvSetupUtils {
+
+  def getAllSparkNodesReadablePythonEnvRootDir(): Option[String] = {
+    val sparkContext = SparkContext.getActive.get
+    val sparkConf = sparkContext.conf
+
+    // For databricks runtime,
+    // we need to set `allSparkNodesReadableTempDir` to be REPL temp directory
+    val allSparkNodesReadableTempDir = sparkConf.getOption("spark.nfs.rootDir")
+      .orElse {
+        if (sparkContext.isLocal || sparkContext.master.startsWith("local-cluster[")) {
+          // For local / local-cluster mode spark
+          Some(SparkFiles.getRootDirectory())
+        } else None
+      }
+
+    allSparkNodesReadableTempDir.map { path =>
+      val rootEnvDir = new File(path, "spark-udf-python-env-root")
+      rootEnvDir.mkdirs()
+      rootEnvDir.getPath
+    }
+  }
+
+
+  def setupPythonEnvOnSparkDriverIfAvailable(pipDependencies: Seq[String],
+                                             pipConstraints: Seq[String]): Unit = {
+
+    val allSparkNodesReadablePythonEnvRootDirOpt = getAllSparkNodesReadablePythonEnvRootDir()
+
+    if (allSparkNodesReadablePythonEnvRootDirOpt.isDefined) {
+      // If we have python-env root directory that is accessible to all spark nodes,
+      // we can create python environment with provided dependencies in driver side
+      // and spark exectuor can directly use it.
+      val sparkContext = SparkContext.getActive.get
+      val sparkConf = sparkContext.conf
+
+      sparkContext.setLocalProperty(
+        "pythonEnv.allSparkNodesReadableRootEnvDir",
+        allSparkNodesReadablePythonEnvRootDirOpt.get
+      )
+
+      val pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON)
+        .orElse(sparkConf.get(PYSPARK_PYTHON))
+        .orElse(sys.env.get("PYSPARK_DRIVER_PYTHON"))
+        .orElse(sys.env.get("PYSPARK_PYTHON"))
+        .getOrElse("python3")
+
+      if (!pipDependencies.isEmpty) {
+        PythonEnvManager.getOrCreatePythonEnvironment(
+          pythonExec,
+          allSparkNodesReadablePythonEnvRootDirOpt.get,
+          pipDependencies,
+          pipConstraints
+        )
+      }
+    }
+  }
+
+  def getPipRequirementsFromChainedPythonFuncs(
+                                                funcs: Seq[ChainedPythonFunctions]
+                                              ): (Seq[String], Seq[String]) = {
+    val headPipDeps = funcs.head.funcs.head.pipDependencies
+    val headPipConstraints = funcs.head.funcs.head.pipConstraints
+
+    for (chainedFunc <- funcs) {
+      for (simpleFunc <- chainedFunc.funcs) {
+        if (simpleFunc.pipDependencies != headPipDeps ||
+          simpleFunc.pipConstraints != headPipConstraints
+        ) {
+          // TODO: For this case, we should split current python runner
+          //  into multiple python runners.

Review Comment:
   Does this TODO conflict with https://github.com/databricks/runtime/pull/55522 ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WeichenXu123 commented on a diff in pull request #40954: [PYSPARK] [CONNECT] [ML] PySpark UDF supports python package dependencies

Posted by "WeichenXu123 (via GitHub)" <gi...@apache.org>.
WeichenXu123 commented on code in PR #40954:
URL: https://github.com/apache/spark/pull/40954#discussion_r1177557772


##########
python/pyspark/pythonenv/pip_req_parser.py:
##########
@@ -0,0 +1,191 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import json
+import sys
+import subprocess
+from threading import Timer
+import tempfile
+import os
+import pkg_resources
+import importlib_metadata
+from itertools import filterfalse, chain
+from collections import namedtuple
+import logging
+import re
+from typing import NamedTuple, Optional
+from pathlib import Path
+
+
+# Represents a pip requirement.
+#
+# :param req_str: A requirement string (e.g. "scikit-learn == 0.24.2").
+# :param is_constraint: A boolean indicating whether this requirement is a constraint.
+_Requirement = namedtuple("_Requirement", ["req_str", "is_constraint"])
+
+
+def _is_comment(line):
+    return line.startswith("#")
+
+
+def _is_empty(line):
+    return line == ""
+
+
+def _strip_inline_comment(line):
+    return line[: line.find(" #")].rstrip() if " #" in line else line
+
+
+def _is_requirements_file(line):
+    return line.startswith("-r ") or line.startswith("--requirement ")
+
+
+def _is_constraints_file(line):
+    return line.startswith("-c ") or line.startswith("--constraint ")
+
+
+def _join_continued_lines(lines):
+    """
+    Joins lines ending with '\\'.
+    >>> _join_continued_lines["a\\", "b\\", "c"]
+    >>> 'abc'
+    """
+    continued_lines = []
+
+    for line in lines:
+        if line.endswith("\\"):
+            continued_lines.append(line.rstrip("\\"))
+        else:
+            continued_lines.append(line)
+            yield "".join(continued_lines)
+            continued_lines.clear()
+
+    # The last line ends with '\'
+    if continued_lines:
+        yield "".join(continued_lines)
+
+
+_req_line_regex = re.compile(r'^([a-z-_][0-9a-z-_]*).*', re.IGNORECASE)
+
+
+def _verify_req_line(line):
+    match = _req_line_regex.match(line)
+    if match is None:
+        raise ValueError(f"This pip requirement line is invalid: '{line}'")
+    if match.group(1).lower() == "pyspark":
+        raise ValueError(
+            "pyspark dependency or constraint is disallowed to set for pyspark UDF, "
+            "pyspark UDF worker python environment always install pyspark package with "
+            "exactly the same version with server side spark."
+        )
+
+
+def _parse_requirements(requirements, is_constraint, base_dir=None):

Review Comment:
   I feel it is unsafe to call `pip._internal` that is not public interface



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #40954: [PYSPARK] [CONNECT] [ML] PySpark UDF supports python package dependencies

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on PR #40954:
URL: https://github.com/apache/spark/pull/40954#issuecomment-1524348310

   also cc @cloud-fan @hvanhovell 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40954: [PYSPARK] [CONNECT] [ML] PySpark UDF supports python package dependencies

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #40954:
URL: https://github.com/apache/spark/pull/40954#discussion_r1184624127


##########
core/src/main/scala/org/apache/spark/api/python/PythonEnvManager.scala:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.{File, PrintWriter}
+import java.nio.file.Path
+
+import scala.collection.JavaConverters._
+import scala.util.Using
+
+import org.apache.commons.codec.digest.DigestUtils
+import org.slf4j.LoggerFactory
+
+// import org.apache.spark.SPARK_VERSION
+import org.apache.spark.util.Utils
+
+
+object PythonEnvManager {
+
+  private val logger = LoggerFactory.getLogger(PythonEnvSetupUtils.getClass)
+
+  val PIP_CACHE_DIR = "pip_cache_pkgs"
+
+  private def getVirtualenvCommandExtraEnv(envRootDir: String): Map[String, String] = {
+    Map(
+      // PIP_NO_INPUT=1 makes pip run in non-interactive mode,
+      // otherwise pip might prompt "yes or no" and ask stdin input
+      "PIP_NO_INPUT" -> "1",
+      // Specify pip cache dir
+      "PIP_CACHE_DIR" -> new File(envRootDir, PIP_CACHE_DIR).getAbsolutePath()
+    )
+  }
+
+  private def getPythonEnvironmentKey(
+                               pipDependencies: Seq[String],
+                               pipConstraints: Seq[String]
+                             ): String = {
+    val data = pipDependencies.mkString(",") + "\n" + pipConstraints.mkString(",")
+    "python-" + DigestUtils.sha1Hex(data)
+  }
+
+  // TODO: Support creating python env with specified python version.
+  def getOrCreatePythonEnvironment(
+                                    pythonExec: String,
+                                    rootEnvDir: String,
+                                    pipDependencies: Seq[String],
+                                    pipConstraints: Seq[String]
+                                  ): String = {
+    // Adds a global lock when creating python environment,
+    // to avoid race conditions.
+    // race conditions includes:
+    //  - python env under the same rootEnvDir shares the pip cache directory,
+    //    but concurrent pip installation causes race condition
+    //  - When creating environment failed, we need to clean the directory,
+    //    concurrent creation/deletion causes race condition
+    // TODO: use file lock instead to ensure safety, e.g. the case that multiple
+    //  spark executors running on the same machine.
+    synchronized {
+      val key = getPythonEnvironmentKey(pipDependencies, pipConstraints)
+      val envDir = Path.of(rootEnvDir, key).toString
+
+      if (!new File(envDir).exists()) {
+        try {
+          createPythonEnvironment(pythonExec, rootEnvDir, envDir, pipDependencies, pipConstraints)

Review Comment:
   When do we remove this new env?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40954: [PYSPARK] [CONNECT] [ML] PySpark UDF supports python package dependencies

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #40954:
URL: https://github.com/apache/spark/pull/40954#discussion_r1184615064


##########
core/src/main/scala/org/apache/spark/api/python/PythonEnvManager.scala:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.{File, PrintWriter}
+import java.nio.file.Path
+
+import scala.collection.JavaConverters._
+import scala.util.Using
+
+import org.apache.commons.codec.digest.DigestUtils
+import org.slf4j.LoggerFactory
+
+// import org.apache.spark.SPARK_VERSION
+import org.apache.spark.util.Utils
+
+
+object PythonEnvManager {
+
+  private val logger = LoggerFactory.getLogger(PythonEnvSetupUtils.getClass)
+
+  val PIP_CACHE_DIR = "pip_cache_pkgs"
+
+  private def getVirtualenvCommandExtraEnv(envRootDir: String): Map[String, String] = {
+    Map(
+      // PIP_NO_INPUT=1 makes pip run in non-interactive mode,
+      // otherwise pip might prompt "yes or no" and ask stdin input
+      "PIP_NO_INPUT" -> "1",
+      // Specify pip cache dir
+      "PIP_CACHE_DIR" -> new File(envRootDir, PIP_CACHE_DIR).getAbsolutePath()
+    )
+  }
+
+  private def getPythonEnvironmentKey(
+                               pipDependencies: Seq[String],
+                               pipConstraints: Seq[String]
+                             ): String = {
+    val data = pipDependencies.mkString(",") + "\n" + pipConstraints.mkString(",")
+    "python-" + DigestUtils.sha1Hex(data)
+  }
+
+  // TODO: Support creating python env with specified python version.
+  def getOrCreatePythonEnvironment(
+                                    pythonExec: String,
+                                    rootEnvDir: String,
+                                    pipDependencies: Seq[String],
+                                    pipConstraints: Seq[String]
+                                  ): String = {
+    // Adds a global lock when creating python environment,
+    // to avoid race conditions.
+    // race conditions includes:
+    //  - python env under the same rootEnvDir shares the pip cache directory,
+    //    but concurrent pip installation causes race condition
+    //  - When creating environment failed, we need to clean the directory,
+    //    concurrent creation/deletion causes race condition
+    // TODO: use file lock instead to ensure safety, e.g. the case that multiple
+    //  spark executors running on the same machine.
+    synchronized {
+      val key = getPythonEnvironmentKey(pipDependencies, pipConstraints)
+      val envDir = Path.of(rootEnvDir, key).toString
+
+      if (!new File(envDir).exists()) {
+        try {
+          createPythonEnvironment(pythonExec, rootEnvDir, envDir, pipDependencies, pipConstraints)
+        } catch {
+          case e: Exception =>
+            // Clean environment directory that is in some undefined status
+            Utils.deleteRecursively(new File(envDir))
+            e.printStackTrace() // for debug
+            throw new RuntimeException(
+              s"Create python environment failed. Root cause: ${e.toString}", e
+            )
+        }
+      }
+      Path.of(envDir, "bin", "python").toString
+    }
+  }
+
+  private def createPythonEnvironment(
+      pythonExec: String,
+      rootEnvDir: String,
+      envDir: String,
+      pipDependencies: Seq[String],
+      pipConstraints: Seq[String]
+  ): Unit = {
+    logger.info(s"Start creating python environment in directory: $envDir")
+
+    val pb = new ProcessBuilder(
+      java.util.Arrays.asList(pythonExec, "-m", "virtualenv", envDir, "--system-site-packages")

Review Comment:
   You might better use `venv` which is built-in in Python.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WeichenXu123 commented on a diff in pull request #40954: [PYSPARK] [CONNECT] [ML] PySpark UDF supports python package dependencies

Posted by "WeichenXu123 (via GitHub)" <gi...@apache.org>.
WeichenXu123 commented on code in PR #40954:
URL: https://github.com/apache/spark/pull/40954#discussion_r1177556859


##########
core/src/main/scala/org/apache/spark/api/python/PythonEnvSetup.scala:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkFiles
+import org.apache.spark.internal.config.{PYSPARK_DRIVER_PYTHON, PYSPARK_PYTHON}
+
+
+
+object PythonEnvSetupUtils {
+
+  def getAllSparkNodesReadablePythonEnvRootDir(): Option[String] = {
+    val sparkContext = SparkContext.getActive.get
+    val sparkConf = sparkContext.conf
+
+    // For databricks runtime,
+    // we need to set `allSparkNodesReadableTempDir` to be REPL temp directory
+    val allSparkNodesReadableTempDir = sparkConf.getOption("spark.nfs.rootDir")
+      .orElse {
+        if (sparkContext.isLocal || sparkContext.master.startsWith("local-cluster[")) {
+          // For local / local-cluster mode spark
+          Some(SparkFiles.getRootDirectory())
+        } else None
+      }
+
+    allSparkNodesReadableTempDir.map { path =>
+      val rootEnvDir = new File(path, "spark-udf-python-env-root")
+      rootEnvDir.mkdirs()
+      rootEnvDir.getPath
+    }
+  }
+
+
+  def setupPythonEnvOnSparkDriverIfAvailable(pipDependencies: Seq[String],
+                                             pipConstraints: Seq[String]): Unit = {
+
+    val allSparkNodesReadablePythonEnvRootDirOpt = getAllSparkNodesReadablePythonEnvRootDir()
+
+    if (allSparkNodesReadablePythonEnvRootDirOpt.isDefined) {
+      // If we have python-env root directory that is accessible to all spark nodes,
+      // we can create python environment with provided dependencies in driver side
+      // and spark exectuor can directly use it.
+      val sparkContext = SparkContext.getActive.get
+      val sparkConf = sparkContext.conf
+
+      sparkContext.setLocalProperty(
+        "pythonEnv.allSparkNodesReadableRootEnvDir",
+        allSparkNodesReadablePythonEnvRootDirOpt.get
+      )
+
+      val pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON)
+        .orElse(sparkConf.get(PYSPARK_PYTHON))
+        .orElse(sys.env.get("PYSPARK_DRIVER_PYTHON"))
+        .orElse(sys.env.get("PYSPARK_PYTHON"))
+        .getOrElse("python3")
+
+      if (!pipDependencies.isEmpty) {
+        PythonEnvManager.getOrCreatePythonEnvironment(
+          pythonExec,
+          allSparkNodesReadablePythonEnvRootDirOpt.get,
+          pipDependencies,
+          pipConstraints
+        )
+      }
+    }
+  }
+
+  def getPipRequirementsFromChainedPythonFuncs(
+                                                funcs: Seq[ChainedPythonFunctions]
+                                              ): (Seq[String], Seq[String]) = {
+    val headPipDeps = funcs.head.funcs.head.pipDependencies
+    val headPipConstraints = funcs.head.funcs.head.pipConstraints
+
+    for (chainedFunc <- funcs) {
+      for (simpleFunc <- chainedFunc.funcs) {
+        if (simpleFunc.pipDependencies != headPipDeps ||
+          simpleFunc.pipConstraints != headPipConstraints
+        ) {
+          // TODO: For this case, we should split current python runner
+          //  into multiple python runners.

Review Comment:
   Does this TODO conflict with https://github.com/databricks/runtime/pull/55522 ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #40954: [PYSPARK] [CONNECT] [ML] PySpark UDF supports python package dependencies

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #40954:
URL: https://github.com/apache/spark/pull/40954#discussion_r1177512193


##########
connector/connect/common/src/main/protobuf/spark/connect/expressions.proto:
##########
@@ -333,6 +333,11 @@ message PythonUDF {
   bytes command = 3;
   // (Required) Python version being used in the client.
   string python_ver = 4;
+  // (Optional) Python function pip dependencies

Review Comment:
   I think we may add a new Class and proto message for dependency



##########
python/pyspark/pythonenv/pip_req_parser.py:
##########
@@ -0,0 +1,191 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import json
+import sys
+import subprocess
+from threading import Timer
+import tempfile
+import os
+import pkg_resources
+import importlib_metadata
+from itertools import filterfalse, chain
+from collections import namedtuple
+import logging
+import re
+from typing import NamedTuple, Optional
+from pathlib import Path
+
+
+# Represents a pip requirement.
+#
+# :param req_str: A requirement string (e.g. "scikit-learn == 0.24.2").
+# :param is_constraint: A boolean indicating whether this requirement is a constraint.
+_Requirement = namedtuple("_Requirement", ["req_str", "is_constraint"])
+
+
+def _is_comment(line):
+    return line.startswith("#")
+
+
+def _is_empty(line):
+    return line == ""
+
+
+def _strip_inline_comment(line):
+    return line[: line.find(" #")].rstrip() if " #" in line else line
+
+
+def _is_requirements_file(line):
+    return line.startswith("-r ") or line.startswith("--requirement ")
+
+
+def _is_constraints_file(line):
+    return line.startswith("-c ") or line.startswith("--constraint ")
+
+
+def _join_continued_lines(lines):
+    """
+    Joins lines ending with '\\'.
+    >>> _join_continued_lines["a\\", "b\\", "c"]
+    >>> 'abc'
+    """
+    continued_lines = []
+
+    for line in lines:
+        if line.endswith("\\"):
+            continued_lines.append(line.rstrip("\\"))
+        else:
+            continued_lines.append(line)
+            yield "".join(continued_lines)
+            continued_lines.clear()
+
+    # The last line ends with '\'
+    if continued_lines:
+        yield "".join(continued_lines)
+
+
+_req_line_regex = re.compile(r'^([a-z-_][0-9a-z-_]*).*', re.IGNORECASE)
+
+
+def _verify_req_line(line):
+    match = _req_line_regex.match(line)
+    if match is None:
+        raise ValueError(f"This pip requirement line is invalid: '{line}'")
+    if match.group(1).lower() == "pyspark":
+        raise ValueError(
+            "pyspark dependency or constraint is disallowed to set for pyspark UDF, "
+            "pyspark UDF worker python environment always install pyspark package with "
+            "exactly the same version with server side spark."
+        )
+
+
+def _parse_requirements(requirements, is_constraint, base_dir=None):

Review Comment:
   is it possible to reuse `pip` instead?
   
   `from pip._internal.req import parse_requirements`



##########
core/src/main/scala/org/apache/spark/api/python/PythonEnvSetup.scala:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkFiles
+import org.apache.spark.internal.config.{PYSPARK_DRIVER_PYTHON, PYSPARK_PYTHON}
+
+
+
+object PythonEnvSetupUtils {
+
+  def getAllSparkNodesReadablePythonEnvRootDir(): Option[String] = {
+    val sparkContext = SparkContext.getActive.get
+    val sparkConf = sparkContext.conf
+
+    // For databricks runtime,
+    // we need to set `allSparkNodesReadableTempDir` to be REPL temp directory
+    val allSparkNodesReadableTempDir = sparkConf.getOption("spark.nfs.rootDir")
+      .orElse {
+        if (sparkContext.isLocal || sparkContext.master.startsWith("local-cluster[")) {
+          // For local / local-cluster mode spark
+          Some(SparkFiles.getRootDirectory())
+        } else None
+      }
+
+    allSparkNodesReadableTempDir.map { path =>
+      val rootEnvDir = new File(path, "spark-udf-python-env-root")
+      rootEnvDir.mkdirs()
+      rootEnvDir.getPath
+    }
+  }
+
+
+  def setupPythonEnvOnSparkDriverIfAvailable(pipDependencies: Seq[String],
+                                             pipConstraints: Seq[String]): Unit = {
+
+    val allSparkNodesReadablePythonEnvRootDirOpt = getAllSparkNodesReadablePythonEnvRootDir()
+
+    if (allSparkNodesReadablePythonEnvRootDirOpt.isDefined) {
+      // If we have python-env root directory that is accessible to all spark nodes,
+      // we can create python environment with provided dependencies in driver side
+      // and spark exectuor can directly use it.
+      val sparkContext = SparkContext.getActive.get
+      val sparkConf = sparkContext.conf
+
+      sparkContext.setLocalProperty(
+        "pythonEnv.allSparkNodesReadableRootEnvDir",
+        allSparkNodesReadablePythonEnvRootDirOpt.get
+      )
+
+      val pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON)
+        .orElse(sparkConf.get(PYSPARK_PYTHON))
+        .orElse(sys.env.get("PYSPARK_DRIVER_PYTHON"))
+        .orElse(sys.env.get("PYSPARK_PYTHON"))
+        .getOrElse("python3")
+
+      if (!pipDependencies.isEmpty) {
+        PythonEnvManager.getOrCreatePythonEnvironment(
+          pythonExec,
+          allSparkNodesReadablePythonEnvRootDirOpt.get,
+          pipDependencies,
+          pipConstraints
+        )
+      }
+    }
+  }
+
+  def getPipRequirementsFromChainedPythonFuncs(
+                                                funcs: Seq[ChainedPythonFunctions]
+                                              ): (Seq[String], Seq[String]) = {
+    val headPipDeps = funcs.head.funcs.head.pipDependencies
+    val headPipConstraints = funcs.head.funcs.head.pipConstraints
+
+    for (chainedFunc <- funcs) {
+      for (simpleFunc <- chainedFunc.funcs) {
+        if (simpleFunc.pipDependencies != headPipDeps ||
+          simpleFunc.pipConstraints != headPipConstraints
+        ) {
+          // TODO: For this case, we should split current python runner
+          //  into multiple python runners.

Review Comment:
   maybe we could first detect whether the two envs can be merged, e.g.:
   ```
   env1:
   numpy
   
   env2:
   numpy
   scipy
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] github-actions[bot] closed pull request #40954: [PYSPARK] [CONNECT] [ML] PySpark UDF supports python package dependencies

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed pull request #40954: [PYSPARK] [CONNECT] [ML] PySpark UDF supports python package dependencies
URL: https://github.com/apache/spark/pull/40954


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org