You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2022/11/22 02:17:04 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #3820] [Subtask] [PySpark] Skip missing MagicNode and code improvements

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3fae1845e [KYUUBI #3820] [Subtask] [PySpark] Skip missing MagicNode and code improvements
3fae1845e is described below

commit 3fae1845e7f21d0b33e4852798b2d331f0cd5166
Author: liangbowen <li...@gf.com.cn>
AuthorDate: Tue Nov 22 10:16:52 2022 +0800

    [KYUUBI #3820] [Subtask] [PySpark] Skip missing MagicNode and code improvements
    
    ### _Why are the changes needed?_
    
    to close #3820 .
    
    To improve pyspark script support,
    1. skip missing MagicNode implementation, since Jupyter and sparkmagic are not yet supported
    2. add missing execute_reply_internal_error method
    3. fix by calling clearOutputs before loop
    4. ident lines and optimze unsed imports to conform python code style
    5. Check Python major version , and exit on Python 2.x
    6. fix name typo of `PythonResponse`
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #3819 from bowenliang123/imrove-pyspark.
    
    Closes #3820
    
    473b9952 [liangbowen] add return type to `connect_to_existed_gateway`
    66927821 [liangbowen] remove unnecessary comments for magic code
    21e1d7a2 [liangbowen] move pyspark path preparing to the top of exeuction_python
    9751e094 [liangbowen] revert to use SparkSessionBuilder for session creation
    c4f3ef55 [liangbowen] use `SparkSession._create_shell_session()` to create spark session
    c2f65630 [liangbowen] delay importing kyuubi_util
    5ed893cc [liangbowen] adding Exception to except, to prevent PEP 8: E203
    029361a9 [liangbowen] ast module adaptation for >=3.8
    00c75fda [liangbowen] remove legacy code for importing unicode
    9f56a4f4 [liangbowen] add todo
    1da708ed [liangbowen] fix typo for PythonResponse, and minor declaration improvement
    910c62fb [liangbowen] remove MagicNode implementation since Jupyter and sparkmagic are not yet supported
    5f15c257 [liangbowen] exit on python 2.x
    86ff7d06 [liangbowen] ident lines to conform python code style
    5634c5e0 [liangbowen] rename get_spark to get_spark_session, and optimize unused imports in kyuubi_util.py
    9d3e1d0c [liangbowen] add missing MagicNode implementation
    0ade1dbe [liangbowen] add missing execute_reply_internal_error method
    aee205a5 [liangbowen] import cStringIO for fix package resolving problem
    acdd4b16 [liangbowen] fix by calling clearOutputs before loop
    
    Authored-by: liangbowen <li...@gf.com.cn>
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 .../src/main/resources/python/execute_python.py    | 131 +++++++++++----------
 .../src/main/resources/python/kyuubi_util.py       |  23 ++--
 .../engine/spark/operation/ExecutePython.scala     |  14 +--
 3 files changed, 83 insertions(+), 85 deletions(-)

diff --git a/externals/kyuubi-spark-sql-engine/src/main/resources/python/execute_python.py b/externals/kyuubi-spark-sql-engine/src/main/resources/python/execute_python.py
index 299be587f..67539b3b9 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/resources/python/execute_python.py
+++ b/externals/kyuubi-spark-sql-engine/src/main/resources/python/execute_python.py
@@ -15,28 +15,57 @@
 # limitations under the License.
 #
 
-from glob import glob
 import ast
-import sys
 import io
 import json
-import traceback
-import re
+
 import os
+import re
+import sys
+import traceback
+from glob import glob
+
+if sys.version_info[0] < 3:
+    sys.exit('Python < 3 is unsupported.')
+
+spark_home = os.environ.get("SPARK_HOME", "")
+os.environ["PYSPARK_PYTHON"] = os.environ.get("PYSPARK_PYTHON", sys.executable)
+
+# add pyspark to sys.path
+
+if "pyspark" not in sys.modules:
+    spark_python = os.path.join(spark_home, "python")
+    try:
+        py4j = glob(os.path.join(spark_python, "lib", "py4j-*.zip"))[0]
+    except IndexError:
+        raise Exception(
+            "Unable to find py4j in {}, your SPARK_HOME may not be configured correctly".format(
+                spark_python
+            )
+        )
+    sys.path[:0] = sys_path = [spark_python, py4j]
+else:
+    # already imported, no need to patch sys.path
+    sys_path = None
+
+# import kyuubi_util after preparing sys.path
+import kyuubi_util
 
 # ast api is changed after python 3.8, see https://github.com/ipython/ipython/pull/11593
-if sys.version_info > (3,8):
-  from ast import Module
-else :
-  # mock the new API, ignore second argument
-  # see https://github.com/ipython/ipython/issues/11590
-  from ast import Module as OriginalModule
-  Module = lambda nodelist, type_ignores: OriginalModule(nodelist)
+if sys.version_info >= (3, 8):
+    from ast import Module
+else:
+    # mock the new API, ignore second argument
+    # see https://github.com/ipython/ipython/issues/11590
+    from ast import Module as OriginalModule
+
+    Module = lambda nodelist, type_ignores: OriginalModule(nodelist)
 
 TOP_FRAME_REGEX = re.compile(r'\s*File "<stdin>".*in <module>')
 
 global_dict = {}
 
+
 class NormalNode(object):
     def __init__(self, code):
         self.code = compile(code, '<stdin>', 'exec', ast.PyCF_ONLY_AST, 1)
@@ -54,21 +83,24 @@ class NormalNode(object):
                 mod = ast.Interactive([node])
                 code = compile(mod, '<stdin>', 'single')
                 exec(code, global_dict)
-        except:
+        except Exception:
             # We don't need to log the exception because we're just executing user
             # code and passing the error along.
             raise ExecutionError(sys.exc_info())
 
+
 class ExecutionError(Exception):
     def __init__(self, exc_info):
         self.exc_info = exc_info
 
+
 class UnicodeDecodingStringIO(io.StringIO):
     def write(self, s):
         if isinstance(s, bytes):
             s = s.decode("utf-8")
         super(UnicodeDecodingStringIO, self).write(s)
 
+
 def clearOutputs():
     sys.stdout.close()
     sys.stderr.close()
@@ -81,16 +113,6 @@ def parse_code_into_nodes(code):
     try:
         nodes.append(NormalNode(code))
     except SyntaxError:
-        # It's possible we hit a syntax error because of a magic command. Split the code groups
-        # of 'normal code', and code that starts with a '%'. possibly magic code
-        # lines, and see if any of the lines
-        # Remove lines until we find a node that parses, then check if the next line is a magic
-        # line
-        # .
-
-        # Split the code into chunks of normal code, and possibly magic code, which starts with
-        # a '%'.
-
         normal = []
         chunks = []
         for i, line in enumerate(code.rstrip().split('\n')):
@@ -108,13 +130,15 @@ def parse_code_into_nodes(code):
 
         # Convert the chunks into AST nodes. Let exceptions propagate.
         for chunk in chunks:
-            if chunk.startswith('%'):
-                nodes.append(MagicNode(chunk))
-            else:
-                nodes.append(NormalNode(chunk))
+            # TODO: look back here when Jupyter and sparkmagic are supported
+            # if chunk.startswith('%'):
+            #     nodes.append(MagicNode(chunk))
+
+            nodes.append(NormalNode(chunk))
 
     return nodes
 
+
 def execute_reply(status, content):
     msg = {
         'msg_type': 'execute_reply',
@@ -125,17 +149,15 @@ def execute_reply(status, content):
     }
     return json.dumps(msg)
 
+
 def execute_reply_ok(data):
     return execute_reply("ok", {
         "data": data,
     })
 
+
 def execute_reply_error(exc_type, exc_value, tb):
-    # LOG.error('execute_reply', exc_info=True)
-    if sys.version >= '3':
-      formatted_tb = traceback.format_exception(exc_type, exc_value, tb, chain=False)
-    else:
-      formatted_tb = traceback.format_exception(exc_type, exc_value, tb)
+    formatted_tb = traceback.format_exception(exc_type, exc_value, tb, chain=False)
     for i in range(len(formatted_tb)):
         if TOP_FRAME_REGEX.match(formatted_tb[i]):
             formatted_tb = formatted_tb[:1] + formatted_tb[i + 1:]
@@ -147,6 +169,15 @@ def execute_reply_error(exc_type, exc_value, tb):
         'traceback': formatted_tb,
     })
 
+
+def execute_reply_internal_error(message, exc_info=None):
+    return execute_reply('error', {
+        'ename': 'InternalError',
+        'evalue': message,
+        'traceback': [],
+    })
+
+
 def execute_request(content):
     try:
         code = content['code']
@@ -193,49 +224,25 @@ def execute_request(content):
 
     return execute_reply_ok(result)
 
-# import findspark
-# findspark.init()
 
-spark_home = os.environ.get("SPARK_HOME", "")
-os.environ["PYSPARK_PYTHON"] = os.environ.get("PYSPARK_PYTHON", sys.executable)
+# get or create spark session
+spark_session = kyuubi_util.get_spark_session()
+global_dict['spark'] = spark_session
 
-# add pyspark to sys.path
-
-if "pyspark" not in sys.modules:
-    spark_python = os.path.join(spark_home, "python")
-    try:
-        py4j = glob(os.path.join(spark_python, "lib", "py4j-*.zip"))[0]
-    except IndexError:
-        raise Exception(
-            "Unable to find py4j in {}, your SPARK_HOME may not be configured correctly".format(
-                spark_python
-            )
-        )
-    sys.path[:0] = sys_path = [spark_python, py4j]
-else:
-    # already imported, no need to patch sys.path
-    sys_path = None
-
-import kyuubi_util
-spark = kyuubi_util.get_spark()
-global_dict['spark'] = spark
 
 def main():
     sys_stdin = sys.stdin
     sys_stdout = sys.stdout
     sys_stderr = sys.stderr
 
-    if sys.version >= '3':
-        sys.stdin = io.StringIO()
-    else:
-        sys.stdin = cStringIO.StringIO()
-
+    sys.stdin = io.StringIO()
     sys.stdout = UnicodeDecodingStringIO()
     sys.stderr = UnicodeDecodingStringIO()
 
     stderr = sys.stderr.getvalue()
     print(stderr, file=sys_stderr)
-    clearOutputs
+    clearOutputs()
+
     try:
 
         while True:
@@ -249,7 +256,6 @@ def main():
             try:
                 content = json.loads(line)
             except ValueError:
-                # LOG.error('failed to parse message', exc_info=True)
                 continue
 
             if content['cmd'] == 'exit_worker':
@@ -265,5 +271,6 @@ def main():
         sys.stdout = sys_stdout
         sys.stderr = sys_stderr
 
+
 if __name__ == '__main__':
     sys.exit(main())
diff --git a/externals/kyuubi-spark-sql-engine/src/main/resources/python/kyuubi_util.py b/externals/kyuubi-spark-sql-engine/src/main/resources/python/kyuubi_util.py
index 4478060e9..8bbe6eb7c 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/resources/python/kyuubi_util.py
+++ b/externals/kyuubi-spark-sql-engine/src/main/resources/python/kyuubi_util.py
@@ -15,29 +15,19 @@
 # limitations under the License.
 #
 
-import atexit
 import os
-import sys
-import signal
-import shlex
-import shutil
-import socket
-import platform
-import tempfile
-import time
-from subprocess import Popen, PIPE
 
-from py4j.java_gateway import java_import, JavaGateway, JavaObject, GatewayParameters
 from py4j.clientserver import ClientServer, JavaParameters, PythonParameters
+from py4j.java_gateway import java_import, JavaGateway, GatewayParameters
 from pyspark.context import SparkContext
-from pyspark.serializers import read_int, write_with_length, UTF8Deserializer
+from pyspark.serializers import read_int, UTF8Deserializer
 from pyspark.sql import SparkSession
 
 
-def connect_to_exist_gateway():
+def connect_to_exist_gateway() -> "JavaGateway":
     conn_info_file = os.environ.get("PYTHON_GATEWAY_CONNECTION_INFO")
     if conn_info_file is None:
-       raise SystemExit("the python gateway connection information file not found!")
+        raise SystemExit("the python gateway connection information file not found!")
     with open(conn_info_file, "rb") as info:
         gateway_port = read_int(info)
         gateway_secret = UTF8Deserializer().loads(info)
@@ -72,16 +62,17 @@ def connect_to_exist_gateway():
 
     return gateway
 
+
 def _get_exist_spark_context(self, jconf):
     """
     Initialize SparkContext in function to allow subclass specific initialization
     """
     return self._jvm.JavaSparkContext(self._jvm.org.apache.spark.SparkContext.getOrCreate(jconf))
 
-def get_spark():
+
+def get_spark_session() -> "SparkSession":
     SparkContext._initialize_context = _get_exist_spark_context
     gateway = connect_to_exist_gateway()
     SparkContext._ensure_initialized(gateway=gateway)
     spark = SparkSession.builder.master('local').appName('test').getOrCreate()
     return spark
-
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
index 3254f0e2c..4fb08105d 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
@@ -98,14 +98,14 @@ case class SessionPythonWorker(
   private val stdout: BufferedReader =
     new BufferedReader(new InputStreamReader(workerProcess.getInputStream), 1)
 
-  def runCode(code: String): Option[PythonReponse] = {
+  def runCode(code: String): Option[PythonResponse] = {
     val input = ExecutePython.toJson(Map("code" -> code, "cmd" -> "run_code"))
     // scalastyle:off println
     stdin.println(input)
     // scalastyle:on
     stdin.flush()
     Option(stdout.readLine())
-      .map(ExecutePython.fromJson[PythonReponse](_))
+      .map(ExecutePython.fromJson[PythonResponse](_))
   }
 
   def close(): Unit = {
@@ -125,7 +125,7 @@ case class SessionPythonWorker(
 object ExecutePython extends Logging {
 
   private val isPythonGatewayStart = new AtomicBoolean(false)
-  val kyuubiPythonPath = Files.createTempDirectory("")
+  private val kyuubiPythonPath = Files.createTempDirectory("")
   def init(): Unit = {
     if (!isPythonGatewayStart.get()) {
       synchronized {
@@ -186,7 +186,7 @@ object ExecutePython extends Logging {
 
   private def startStderrSteamReader(process: Process): Thread = {
     val stderrThread = new Thread("process stderr thread") {
-      override def run() = {
+      override def run(): Unit = {
         val lines = scala.io.Source.fromInputStream(process.getErrorStream).getLines()
         lines.foreach(logger.error)
       }
@@ -198,7 +198,7 @@ object ExecutePython extends Logging {
 
   def startWatcher(process: Process): Thread = {
     val processWatcherThread = new Thread("process watcher thread") {
-      override def run() = {
+      override def run(): Unit = {
         val exitCode = process.waitFor()
         if (exitCode != 0) {
           logger.error(f"Process has died with $exitCode")
@@ -229,7 +229,7 @@ object ExecutePython extends Logging {
     file
   }
 
-  val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
+  val mapper: ObjectMapper = new ObjectMapper().registerModule(DefaultScalaModule)
   def toJson[T](obj: T): String = {
     mapper.writeValueAsString(obj)
   }
@@ -243,7 +243,7 @@ object ExecutePython extends Logging {
 
 }
 
-case class PythonReponse(
+case class PythonResponse(
     msg_type: String,
     content: PythonResponseContent)