You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@toree.apache.org by ch...@apache.org on 2016/06/20 13:41:16 UTC

[1/3] incubator-toree git commit: Wrapping the kernel object in a python class, and tweaking the spark context creation

Repository: incubator-toree
Updated Branches:
  refs/heads/master c01f8a9f4 -> 29d10b3e7


Wrapping the kernel object in a python class, and tweaking the spark context creation


Project: http://git-wip-us.apache.org/repos/asf/incubator-toree/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-toree/commit/a7275ed7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-toree/tree/a7275ed7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-toree/diff/a7275ed7

Branch: refs/heads/master
Commit: a7275ed744242eefbfb4aaafd8cbca06296a7b48
Parents: c01f8a9
Author: Liam Fisk <li...@xtra.co.nz>
Authored: Sun Jun 12 11:28:05 2016 +1200
Committer: Liam Fisk <li...@xtra.co.nz>
Committed: Sun Jun 12 17:04:34 2016 +1200

----------------------------------------------------------------------
 .../main/resources/PySpark/pyspark_runner.py    | 203 ++++++++++---------
 1 file changed, 112 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/a7275ed7/pyspark-interpreter/src/main/resources/PySpark/pyspark_runner.py
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/resources/PySpark/pyspark_runner.py b/pyspark-interpreter/src/main/resources/PySpark/pyspark_runner.py
index 5f3c079..3759165 100644
--- a/pyspark-interpreter/src/main/resources/PySpark/pyspark_runner.py
+++ b/pyspark-interpreter/src/main/resources/PySpark/pyspark_runner.py
@@ -37,9 +37,9 @@ client = GatewayClient(port=int(sys.argv[1]))
 sparkVersion = sys.argv[2]
 
 if re.match("^1\.[456]\..*$", sparkVersion):
-  gateway = JavaGateway(client, auto_convert = True)
+    gateway = JavaGateway(client, auto_convert=True)
 else:
-  gateway = JavaGateway(client)
+    gateway = JavaGateway(client)
 
 java_import(gateway.jvm, "org.apache.spark.SparkEnv")
 java_import(gateway.jvm, "org.apache.spark.SparkConf")
@@ -51,112 +51,133 @@ bridge = gateway.entry_point
 state = bridge.state()
 state.markReady()
 
-#jsc = bridge.javaSparkContext()
-
 if sparkVersion.startswith("1.2"):
-  java_import(gateway.jvm, "org.apache.spark.sql.SQLContext")
-  java_import(gateway.jvm, "org.apache.spark.sql.hive.HiveContext")
-  java_import(gateway.jvm, "org.apache.spark.sql.hive.LocalHiveContext")
-  java_import(gateway.jvm, "org.apache.spark.sql.hive.TestHiveContext")
+    java_import(gateway.jvm, "org.apache.spark.sql.SQLContext")
+    java_import(gateway.jvm, "org.apache.spark.sql.hive.HiveContext")
+    java_import(gateway.jvm, "org.apache.spark.sql.hive.LocalHiveContext")
+    java_import(gateway.jvm, "org.apache.spark.sql.hive.TestHiveContext")
 elif sparkVersion.startswith("1.3"):
-  java_import(gateway.jvm, "org.apache.spark.sql.*")
-  java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
+    java_import(gateway.jvm, "org.apache.spark.sql.*")
+    java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
 elif re.match("^1\.[456]\..*$", sparkVersion):
-  java_import(gateway.jvm, "org.apache.spark.sql.*")
-  java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
+    java_import(gateway.jvm, "org.apache.spark.sql.*")
+    java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
 
 java_import(gateway.jvm, "scala.Tuple2")
 
-
+conf = None
 sc = None
 sqlContext = None
 code_info = None
 
-kernel = bridge.kernel()
 
 class Logger(object):
-  def __init__(self):
-    self.out = ""
+    def __init__(self):
+        self.out = ""
 
-  def write(self, message):
-    state.sendOutput(code_info.codeId(), message)
-    self.out = self.out + message
+    def write(self, message):
+        state.sendOutput(code_info.codeId(), message)
+        self.out = self.out + message
 
-  def get(self):
-    return self.out
+    def get(self):
+        return self.out
 
-  def reset(self):
-    self.out = ""
+    def reset(self):
+        self.out = ""
 
 output = Logger()
 sys.stdout = output
 sys.stderr = output
 
-while True :
-  try:
-    global code_info
-    code_info = state.nextCode()
-
-    # If code is not available, try again later
-    if (code_info is None):
-      sleep(1)
-      continue
-
-    code_lines = code_info.code().split("\n")
-    #jobGroup = req.jobGroup()
-    final_code = None
-
-    for s in code_lines:
-      if s == None or len(s.strip()) == 0:
-        continue
-
-      # skip comment
-      if s.strip().startswith("#"):
-        continue
-
-      if final_code:
-        final_code += "\n" + s
-      else:
-        final_code = s
-
-    if sc is None:
-      jsc = kernel.javaSparkContext()
-      if jsc is not None:
-        jconf = kernel.sparkConf()
-        conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf)
-        sc = SparkContext(jsc = jsc, gateway = gateway, conf = conf)
-
-    if sqlContext is None:
-      jsqlContext = kernel.sqlContext()
-      if jsqlContext is not None and sc is not None:
-        sqlContext = SQLContext(sc, sqlContext=jsqlContext)
-
-    if final_code:
-      '''Parse the final_code to an AST parse tree.  If the last node is an expression (where an expression
-      can be a print function or an operation like 1+1) turn it into an assignment where temp_val = last expression.
-      The modified parse tree will get executed.  If the variable temp_val introduced is not none then we have the
-      result of the last expression and should return it as an execute result.  The sys.stdout sendOutput logic
-      gets triggered on each logger message to support long running code blocks instead of bulk'''
-      ast_parsed = ast.parse(final_code)
-      the_last_expression_to_assign_temp_value = None
-      if isinstance(ast_parsed.body[-1], ast.Expr):
-        new_node = (ast.Assign(targets=[ast.Name(id='the_last_expression_to_assign_temp_value', ctx=ast.Store())], value=ast_parsed.body[-1].value))
-        ast_parsed.body[-1] = ast.fix_missing_locations(new_node)
-      compiled_code = compile(ast_parsed, "<string>", "exec")
-      eval(compiled_code)
-      if the_last_expression_to_assign_temp_value is not None:
-        state.markSuccess(code_info.codeId(), str(the_last_expression_to_assign_temp_value))
-      else:
-        state.markSuccess(code_info.codeId(), "")
-      del the_last_expression_to_assign_temp_value
-
-  except Py4JJavaError:
-    excInnerError = traceback.format_exc() # format_tb() does not return the inner exception
-    innerErrorStart = excInnerError.find("Py4JJavaError:")
-    if innerErrorStart > -1:
-       excInnerError = excInnerError[innerErrorStart:]
-    state.markFailure(code_info.codeId(), excInnerError + str(sys.exc_info()))
-  except:
-    state.markFailure(code_info.codeId(), traceback.format_exc())
-
-  output.reset()
+
+class Kernel(object):
+    def __init__(self, jkernel):
+        self._jvm_kernel = jkernel
+
+    def createSparkContext(self, config):
+        jconf = gateway.jvm.org.apache.spark.SparkConf(False)
+        for key,value in config.getAll():
+            jconf.set(key, value)
+        self._jvm_kernel.createSparkContext(jconf)
+        self.refreshContext()
+
+    def refreshContext(self):
+        global conf, sc, sqlContext
+
+        # This is magic. Please look away. I was never here (prevents multiple gateways being instantiated)
+        with SparkContext._lock:
+            if not SparkContext._gateway:
+                SparkContext._gateway = gateway
+                SparkContext._jvm = gateway.jvm
+
+        if sc is None:
+            jsc = self._jvm_kernel.javaSparkContext()
+            if jsc is not None:
+                jconf = self._jvm_kernel.sparkConf()
+                conf = SparkConf(_jvm=gateway.jvm, _jconf=jconf)
+                sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
+
+        if sqlContext is None:
+            jsqlContext = self._jvm_kernel.sqlContext()
+            if jsqlContext is not None and sc is not None:
+                sqlContext = SQLContext(sc, sqlContext=jsqlContext)
+
+kernel = Kernel(bridge.kernel())
+
+while True:
+    try:
+        code_info = state.nextCode()
+
+        # If code is not available, try again later
+        if code_info is None:
+            sleep(1)
+            continue
+
+        code_lines = code_info.code().split("\n")
+        final_code = None
+
+        for s in code_lines:
+            if s is None or len(s.strip()) == 0:
+                continue
+
+            # skip comment
+            if s.strip().startswith("#"):
+                continue
+
+            if final_code:
+                final_code += "\n" + s
+            else:
+                final_code = s
+
+        # Ensure the appropriate variables are set in the module namespace
+        kernel.refreshContext()
+
+        if final_code:
+            '''Parse the final_code to an AST parse tree.  If the last node is an expression (where an expression
+            can be a print function or an operation like 1+1) turn it into an assignment where temp_val = last expression.
+            The modified parse tree will get executed.  If the variable temp_val introduced is not none then we have the
+            result of the last expression and should return it as an execute result.  The sys.stdout sendOutput logic
+            gets triggered on each logger message to support long running code blocks instead of bulk'''
+            ast_parsed = ast.parse(final_code)
+            the_last_expression_to_assign_temp_value = None
+            if isinstance(ast_parsed.body[-1], ast.Expr):
+                new_node = (ast.Assign(targets=[ast.Name(id='the_last_expression_to_assign_temp_value', ctx=ast.Store())], value=ast_parsed.body[-1].value))
+                ast_parsed.body[-1] = ast.fix_missing_locations(new_node)
+            compiled_code = compile(ast_parsed, "<string>", "exec")
+            eval(compiled_code)
+            if the_last_expression_to_assign_temp_value is not None:
+                state.markSuccess(code_info.codeId(), str(the_last_expression_to_assign_temp_value))
+            else:
+                state.markSuccess(code_info.codeId(), "")
+            del the_last_expression_to_assign_temp_value
+
+    except Py4JJavaError:
+        excInnerError = traceback.format_exc() # format_tb() does not return the inner exception
+        innerErrorStart = excInnerError.find("Py4JJavaError:")
+        if innerErrorStart > -1:
+            excInnerError = excInnerError[innerErrorStart:]
+        state.markFailure(code_info.codeId(), excInnerError + str(sys.exc_info()))
+    except:
+        state.markFailure(code_info.codeId(), traceback.format_exc())
+
+    output.reset()


[3/3] incubator-toree git commit: Removing optional filename extension (which is not currently included in language_info)

Posted by ch...@apache.org.
Removing optional filename extension (which is not currently included in language_info)


Project: http://git-wip-us.apache.org/repos/asf/incubator-toree/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-toree/commit/29d10b3e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-toree/tree/29d10b3e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-toree/diff/29d10b3e

Branch: refs/heads/master
Commit: 29d10b3e7d0e219eef5ccac1dd8c68efe9a69cbd
Parents: 9c5807c
Author: Liam Fisk <li...@xtra.co.nz>
Authored: Sun Jun 19 16:59:04 2016 +1200
Committer: Liam Fisk <li...@xtra.co.nz>
Committed: Sun Jun 19 16:59:04 2016 +1200

----------------------------------------------------------------------
 test_toree.py | 10 +---------
 1 file changed, 1 insertion(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/29d10b3e/test_toree.py
----------------------------------------------------------------------
diff --git a/test_toree.py b/test_toree.py
index 5f21f9c..a4c5485 100644
--- a/test_toree.py
+++ b/test_toree.py
@@ -29,10 +29,6 @@ class ToreeScalaKernelTests(jupyter_kernel_test.KernelTests):
 
     # Optional --------------------------------------
 
-    # the normal file extension (including the leading dot) for this language
-    # checked against language_info.file_extension in kernel_info_reply
-    file_extension = ".scala"
-
     # Code in the kernel's language to write "hello, world" to stdout
     code_hello_world = "println(\"hello, world\")"
 
@@ -106,10 +102,6 @@ class ToreePythonKernelTests(jupyter_kernel_test.KernelTests):
 
     # Optional --------------------------------------
 
-    # the normal file extension (including the leading dot) for this language
-    # checked against language_info.file_extension in kernel_info_reply
-    file_extension = ".py"
-
     # Code in the kernel's language to write "hello, world" to stdout
     code_hello_world = "print(\"hello, world\")"
 
@@ -120,4 +112,4 @@ class ToreePythonKernelTests(jupyter_kernel_test.KernelTests):
     ]
 
 if __name__ == '__main__':
-    unittest.main()
\ No newline at end of file
+    unittest.main()


[2/3] incubator-toree git commit: Splicing the Python and Java kernels together

Posted by ch...@apache.org.
Splicing the Python and Java kernels together


Project: http://git-wip-us.apache.org/repos/asf/incubator-toree/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-toree/commit/9c5807c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-toree/tree/9c5807c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-toree/diff/9c5807c9

Branch: refs/heads/master
Commit: 9c5807c93d4ae27c32150a70df8efc981e921e17
Parents: a7275ed
Author: Liam Fisk <li...@xtra.co.nz>
Authored: Sun Jun 19 12:51:12 2016 +1200
Committer: Liam Fisk <li...@xtra.co.nz>
Committed: Sun Jun 19 14:24:08 2016 +1200

----------------------------------------------------------------------
 .../src/main/resources/PySpark/pyspark_runner.py              | 7 +++++++
 1 file changed, 7 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/9c5807c9/pyspark-interpreter/src/main/resources/PySpark/pyspark_runner.py
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/resources/PySpark/pyspark_runner.py b/pyspark-interpreter/src/main/resources/PySpark/pyspark_runner.py
index 3759165..c752457 100644
--- a/pyspark-interpreter/src/main/resources/PySpark/pyspark_runner.py
+++ b/pyspark-interpreter/src/main/resources/PySpark/pyspark_runner.py
@@ -94,6 +94,13 @@ class Kernel(object):
     def __init__(self, jkernel):
         self._jvm_kernel = jkernel
 
+    def __getattr__(self, name):
+        return self._jvm_kernel.__getattribute__(name)
+
+    def __dir__(self):
+        parent = super().__dir__()
+        return parent + [x for x in self._jvm_kernel.__dir__() if x not in parent]
+
     def createSparkContext(self, config):
         jconf = gateway.jvm.org.apache.spark.SparkConf(False)
         for key,value in config.getAll():