You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "zhengruifeng (via GitHub)" <gi...@apache.org> on 2023/04/26 08:37:43 UTC

[GitHub] [spark] zhengruifeng commented on a diff in pull request #40954: [PYSPARK] [CONNECT] [ML] PySpark UDF supports python package dependencies

zhengruifeng commented on code in PR #40954:
URL: https://github.com/apache/spark/pull/40954#discussion_r1177512193


##########
connector/connect/common/src/main/protobuf/spark/connect/expressions.proto:
##########
@@ -333,6 +333,11 @@ message PythonUDF {
   bytes command = 3;
   // (Required) Python version being used in the client.
   string python_ver = 4;
+  // (Optional) Python function pip dependencies

Review Comment:
   I think we may add a new Class and proto message for dependency



##########
python/pyspark/pythonenv/pip_req_parser.py:
##########
@@ -0,0 +1,191 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import json
+import sys
+import subprocess
+from threading import Timer
+import tempfile
+import os
+import pkg_resources
+import importlib_metadata
+from itertools import filterfalse, chain
+from collections import namedtuple
+import logging
+import re
+from typing import NamedTuple, Optional
+from pathlib import Path
+
+
+# Represents a pip requirement.
+#
+# :param req_str: A requirement string (e.g. "scikit-learn == 0.24.2").
+# :param is_constraint: A boolean indicating whether this requirement is a constraint.
+_Requirement = namedtuple("_Requirement", ["req_str", "is_constraint"])
+
+
+def _is_comment(line):
+    return line.startswith("#")
+
+
+def _is_empty(line):
+    return line == ""
+
+
+def _strip_inline_comment(line):
+    return line[: line.find(" #")].rstrip() if " #" in line else line
+
+
+def _is_requirements_file(line):
+    return line.startswith("-r ") or line.startswith("--requirement ")
+
+
+def _is_constraints_file(line):
+    return line.startswith("-c ") or line.startswith("--constraint ")
+
+
+def _join_continued_lines(lines):
+    """
+    Joins lines ending with '\\'.
+    >>> _join_continued_lines["a\\", "b\\", "c"]
+    >>> 'abc'
+    """
+    continued_lines = []
+
+    for line in lines:
+        if line.endswith("\\"):
+            continued_lines.append(line.rstrip("\\"))
+        else:
+            continued_lines.append(line)
+            yield "".join(continued_lines)
+            continued_lines.clear()
+
+    # The last line ends with '\'
+    if continued_lines:
+        yield "".join(continued_lines)
+
+
+_req_line_regex = re.compile(r'^([a-z-_][0-9a-z-_]*).*', re.IGNORECASE)
+
+
+def _verify_req_line(line):
+    match = _req_line_regex.match(line)
+    if match is None:
+        raise ValueError(f"This pip requirement line is invalid: '{line}'")
+    if match.group(1).lower() == "pyspark":
+        raise ValueError(
+            "pyspark dependency or constraint is disallowed to set for pyspark UDF, "
+            "pyspark UDF worker python environment always install pyspark package with "
+            "exactly the same version with server side spark."
+        )
+
+
+def _parse_requirements(requirements, is_constraint, base_dir=None):

Review Comment:
   is it possible to reuse `pip` instead?
   
   `from pip._internal.req import parse_requirements`



##########
core/src/main/scala/org/apache/spark/api/python/PythonEnvSetup.scala:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkFiles
+import org.apache.spark.internal.config.{PYSPARK_DRIVER_PYTHON, PYSPARK_PYTHON}
+
+
+
+object PythonEnvSetupUtils {
+
+  def getAllSparkNodesReadablePythonEnvRootDir(): Option[String] = {
+    val sparkContext = SparkContext.getActive.get
+    val sparkConf = sparkContext.conf
+
+    // For databricks runtime,
+    // we need to set `allSparkNodesReadableTempDir` to be REPL temp directory
+    val allSparkNodesReadableTempDir = sparkConf.getOption("spark.nfs.rootDir")
+      .orElse {
+        if (sparkContext.isLocal || sparkContext.master.startsWith("local-cluster[")) {
+          // For local / local-cluster mode spark
+          Some(SparkFiles.getRootDirectory())
+        } else None
+      }
+
+    allSparkNodesReadableTempDir.map { path =>
+      val rootEnvDir = new File(path, "spark-udf-python-env-root")
+      rootEnvDir.mkdirs()
+      rootEnvDir.getPath
+    }
+  }
+
+
+  def setupPythonEnvOnSparkDriverIfAvailable(pipDependencies: Seq[String],
+                                             pipConstraints: Seq[String]): Unit = {
+
+    val allSparkNodesReadablePythonEnvRootDirOpt = getAllSparkNodesReadablePythonEnvRootDir()
+
+    if (allSparkNodesReadablePythonEnvRootDirOpt.isDefined) {
+      // If we have python-env root directory that is accessible to all spark nodes,
+      // we can create python environment with provided dependencies in driver side
+      // and spark exectuor can directly use it.
+      val sparkContext = SparkContext.getActive.get
+      val sparkConf = sparkContext.conf
+
+      sparkContext.setLocalProperty(
+        "pythonEnv.allSparkNodesReadableRootEnvDir",
+        allSparkNodesReadablePythonEnvRootDirOpt.get
+      )
+
+      val pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON)
+        .orElse(sparkConf.get(PYSPARK_PYTHON))
+        .orElse(sys.env.get("PYSPARK_DRIVER_PYTHON"))
+        .orElse(sys.env.get("PYSPARK_PYTHON"))
+        .getOrElse("python3")
+
+      if (!pipDependencies.isEmpty) {
+        PythonEnvManager.getOrCreatePythonEnvironment(
+          pythonExec,
+          allSparkNodesReadablePythonEnvRootDirOpt.get,
+          pipDependencies,
+          pipConstraints
+        )
+      }
+    }
+  }
+
+  def getPipRequirementsFromChainedPythonFuncs(
+                                                funcs: Seq[ChainedPythonFunctions]
+                                              ): (Seq[String], Seq[String]) = {
+    val headPipDeps = funcs.head.funcs.head.pipDependencies
+    val headPipConstraints = funcs.head.funcs.head.pipConstraints
+
+    for (chainedFunc <- funcs) {
+      for (simpleFunc <- chainedFunc.funcs) {
+        if (simpleFunc.pipDependencies != headPipDeps ||
+          simpleFunc.pipConstraints != headPipConstraints
+        ) {
+          // TODO: For this case, we should split current python runner
+          //  into multiple python runners.

Review Comment:
   maybe we could first detect whether the two envs can be merged, e.g.:
   ```
   env1:
   numpy
   
   env2:
   numpy
   scipy
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org