You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@toree.apache.org by ch...@apache.org on 2016/06/20 13:41:16 UTC
[1/3] incubator-toree git commit: Wrapping the kernel object in a
python class, and tweaking the spark context creation
Repository: incubator-toree
Updated Branches:
refs/heads/master c01f8a9f4 -> 29d10b3e7
Wrapping the kernel object in a python class, and tweaking the spark context creation
Project: http://git-wip-us.apache.org/repos/asf/incubator-toree/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-toree/commit/a7275ed7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-toree/tree/a7275ed7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-toree/diff/a7275ed7
Branch: refs/heads/master
Commit: a7275ed744242eefbfb4aaafd8cbca06296a7b48
Parents: c01f8a9
Author: Liam Fisk <li...@xtra.co.nz>
Authored: Sun Jun 12 11:28:05 2016 +1200
Committer: Liam Fisk <li...@xtra.co.nz>
Committed: Sun Jun 12 17:04:34 2016 +1200
----------------------------------------------------------------------
.../main/resources/PySpark/pyspark_runner.py | 203 ++++++++++---------
1 file changed, 112 insertions(+), 91 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/a7275ed7/pyspark-interpreter/src/main/resources/PySpark/pyspark_runner.py
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/resources/PySpark/pyspark_runner.py b/pyspark-interpreter/src/main/resources/PySpark/pyspark_runner.py
index 5f3c079..3759165 100644
--- a/pyspark-interpreter/src/main/resources/PySpark/pyspark_runner.py
+++ b/pyspark-interpreter/src/main/resources/PySpark/pyspark_runner.py
@@ -37,9 +37,9 @@ client = GatewayClient(port=int(sys.argv[1]))
sparkVersion = sys.argv[2]
if re.match("^1\.[456]\..*$", sparkVersion):
- gateway = JavaGateway(client, auto_convert = True)
+ gateway = JavaGateway(client, auto_convert=True)
else:
- gateway = JavaGateway(client)
+ gateway = JavaGateway(client)
java_import(gateway.jvm, "org.apache.spark.SparkEnv")
java_import(gateway.jvm, "org.apache.spark.SparkConf")
@@ -51,112 +51,133 @@ bridge = gateway.entry_point
state = bridge.state()
state.markReady()
-#jsc = bridge.javaSparkContext()
-
if sparkVersion.startswith("1.2"):
- java_import(gateway.jvm, "org.apache.spark.sql.SQLContext")
- java_import(gateway.jvm, "org.apache.spark.sql.hive.HiveContext")
- java_import(gateway.jvm, "org.apache.spark.sql.hive.LocalHiveContext")
- java_import(gateway.jvm, "org.apache.spark.sql.hive.TestHiveContext")
+ java_import(gateway.jvm, "org.apache.spark.sql.SQLContext")
+ java_import(gateway.jvm, "org.apache.spark.sql.hive.HiveContext")
+ java_import(gateway.jvm, "org.apache.spark.sql.hive.LocalHiveContext")
+ java_import(gateway.jvm, "org.apache.spark.sql.hive.TestHiveContext")
elif sparkVersion.startswith("1.3"):
- java_import(gateway.jvm, "org.apache.spark.sql.*")
- java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
+ java_import(gateway.jvm, "org.apache.spark.sql.*")
+ java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
elif re.match("^1\.[456]\..*$", sparkVersion):
- java_import(gateway.jvm, "org.apache.spark.sql.*")
- java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
+ java_import(gateway.jvm, "org.apache.spark.sql.*")
+ java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
java_import(gateway.jvm, "scala.Tuple2")
-
+conf = None
sc = None
sqlContext = None
code_info = None
-kernel = bridge.kernel()
class Logger(object):
- def __init__(self):
- self.out = ""
+ def __init__(self):
+ self.out = ""
- def write(self, message):
- state.sendOutput(code_info.codeId(), message)
- self.out = self.out + message
+ def write(self, message):
+ state.sendOutput(code_info.codeId(), message)
+ self.out = self.out + message
- def get(self):
- return self.out
+ def get(self):
+ return self.out
- def reset(self):
- self.out = ""
+ def reset(self):
+ self.out = ""
output = Logger()
sys.stdout = output
sys.stderr = output
-while True :
- try:
- global code_info
- code_info = state.nextCode()
-
- # If code is not available, try again later
- if (code_info is None):
- sleep(1)
- continue
-
- code_lines = code_info.code().split("\n")
- #jobGroup = req.jobGroup()
- final_code = None
-
- for s in code_lines:
- if s == None or len(s.strip()) == 0:
- continue
-
- # skip comment
- if s.strip().startswith("#"):
- continue
-
- if final_code:
- final_code += "\n" + s
- else:
- final_code = s
-
- if sc is None:
- jsc = kernel.javaSparkContext()
- if jsc is not None:
- jconf = kernel.sparkConf()
- conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf)
- sc = SparkContext(jsc = jsc, gateway = gateway, conf = conf)
-
- if sqlContext is None:
- jsqlContext = kernel.sqlContext()
- if jsqlContext is not None and sc is not None:
- sqlContext = SQLContext(sc, sqlContext=jsqlContext)
-
- if final_code:
- '''Parse the final_code to an AST parse tree. If the last node is an expression (where an expression
- can be a print function or an operation like 1+1) turn it into an assignment where temp_val = last expression.
- The modified parse tree will get executed. If the variable temp_val introduced is not none then we have the
- result of the last expression and should return it as an execute result. The sys.stdout sendOutput logic
- gets triggered on each logger message to support long running code blocks instead of bulk'''
- ast_parsed = ast.parse(final_code)
- the_last_expression_to_assign_temp_value = None
- if isinstance(ast_parsed.body[-1], ast.Expr):
- new_node = (ast.Assign(targets=[ast.Name(id='the_last_expression_to_assign_temp_value', ctx=ast.Store())], value=ast_parsed.body[-1].value))
- ast_parsed.body[-1] = ast.fix_missing_locations(new_node)
- compiled_code = compile(ast_parsed, "<string>", "exec")
- eval(compiled_code)
- if the_last_expression_to_assign_temp_value is not None:
- state.markSuccess(code_info.codeId(), str(the_last_expression_to_assign_temp_value))
- else:
- state.markSuccess(code_info.codeId(), "")
- del the_last_expression_to_assign_temp_value
-
- except Py4JJavaError:
- excInnerError = traceback.format_exc() # format_tb() does not return the inner exception
- innerErrorStart = excInnerError.find("Py4JJavaError:")
- if innerErrorStart > -1:
- excInnerError = excInnerError[innerErrorStart:]
- state.markFailure(code_info.codeId(), excInnerError + str(sys.exc_info()))
- except:
- state.markFailure(code_info.codeId(), traceback.format_exc())
-
- output.reset()
+
+class Kernel(object):
+ def __init__(self, jkernel):
+ self._jvm_kernel = jkernel
+
+ def createSparkContext(self, config):
+ jconf = gateway.jvm.org.apache.spark.SparkConf(False)
+ for key,value in config.getAll():
+ jconf.set(key, value)
+ self._jvm_kernel.createSparkContext(jconf)
+ self.refreshContext()
+
+ def refreshContext(self):
+ global conf, sc, sqlContext
+
+ # This is magic. Please look away. I was never here (prevents multiple gateways being instantiated)
+ with SparkContext._lock:
+ if not SparkContext._gateway:
+ SparkContext._gateway = gateway
+ SparkContext._jvm = gateway.jvm
+
+ if sc is None:
+ jsc = self._jvm_kernel.javaSparkContext()
+ if jsc is not None:
+ jconf = self._jvm_kernel.sparkConf()
+ conf = SparkConf(_jvm=gateway.jvm, _jconf=jconf)
+ sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
+
+ if sqlContext is None:
+ jsqlContext = self._jvm_kernel.sqlContext()
+ if jsqlContext is not None and sc is not None:
+ sqlContext = SQLContext(sc, sqlContext=jsqlContext)
+
+kernel = Kernel(bridge.kernel())
+
+while True:
+ try:
+ code_info = state.nextCode()
+
+ # If code is not available, try again later
+ if code_info is None:
+ sleep(1)
+ continue
+
+ code_lines = code_info.code().split("\n")
+ final_code = None
+
+ for s in code_lines:
+ if s is None or len(s.strip()) == 0:
+ continue
+
+ # skip comment
+ if s.strip().startswith("#"):
+ continue
+
+ if final_code:
+ final_code += "\n" + s
+ else:
+ final_code = s
+
+ # Ensure the appropriate variables are set in the module namespace
+ kernel.refreshContext()
+
+ if final_code:
+ '''Parse the final_code to an AST parse tree. If the last node is an expression (where an expression
+ can be a print function or an operation like 1+1) turn it into an assignment where temp_val = last expression.
+ The modified parse tree will get executed. If the variable temp_val introduced is not none then we have the
+ result of the last expression and should return it as an execute result. The sys.stdout sendOutput logic
+ gets triggered on each logger message to support long running code blocks instead of bulk'''
+ ast_parsed = ast.parse(final_code)
+ the_last_expression_to_assign_temp_value = None
+ if isinstance(ast_parsed.body[-1], ast.Expr):
+ new_node = (ast.Assign(targets=[ast.Name(id='the_last_expression_to_assign_temp_value', ctx=ast.Store())], value=ast_parsed.body[-1].value))
+ ast_parsed.body[-1] = ast.fix_missing_locations(new_node)
+ compiled_code = compile(ast_parsed, "<string>", "exec")
+ eval(compiled_code)
+ if the_last_expression_to_assign_temp_value is not None:
+ state.markSuccess(code_info.codeId(), str(the_last_expression_to_assign_temp_value))
+ else:
+ state.markSuccess(code_info.codeId(), "")
+ del the_last_expression_to_assign_temp_value
+
+ except Py4JJavaError:
+ excInnerError = traceback.format_exc() # format_tb() does not return the inner exception
+ innerErrorStart = excInnerError.find("Py4JJavaError:")
+ if innerErrorStart > -1:
+ excInnerError = excInnerError[innerErrorStart:]
+ state.markFailure(code_info.codeId(), excInnerError + str(sys.exc_info()))
+ except:
+ state.markFailure(code_info.codeId(), traceback.format_exc())
+
+ output.reset()
[3/3] incubator-toree git commit: Removing optional filename
extension (which is not currently included in language_info)
Posted by ch...@apache.org.
Removing optional filename extension (which is not currently included in language_info)
Project: http://git-wip-us.apache.org/repos/asf/incubator-toree/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-toree/commit/29d10b3e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-toree/tree/29d10b3e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-toree/diff/29d10b3e
Branch: refs/heads/master
Commit: 29d10b3e7d0e219eef5ccac1dd8c68efe9a69cbd
Parents: 9c5807c
Author: Liam Fisk <li...@xtra.co.nz>
Authored: Sun Jun 19 16:59:04 2016 +1200
Committer: Liam Fisk <li...@xtra.co.nz>
Committed: Sun Jun 19 16:59:04 2016 +1200
----------------------------------------------------------------------
test_toree.py | 10 +---------
1 file changed, 1 insertion(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/29d10b3e/test_toree.py
----------------------------------------------------------------------
diff --git a/test_toree.py b/test_toree.py
index 5f21f9c..a4c5485 100644
--- a/test_toree.py
+++ b/test_toree.py
@@ -29,10 +29,6 @@ class ToreeScalaKernelTests(jupyter_kernel_test.KernelTests):
# Optional --------------------------------------
- # the normal file extension (including the leading dot) for this language
- # checked against language_info.file_extension in kernel_info_reply
- file_extension = ".scala"
-
# Code in the kernel's language to write "hello, world" to stdout
code_hello_world = "println(\"hello, world\")"
@@ -106,10 +102,6 @@ class ToreePythonKernelTests(jupyter_kernel_test.KernelTests):
# Optional --------------------------------------
- # the normal file extension (including the leading dot) for this language
- # checked against language_info.file_extension in kernel_info_reply
- file_extension = ".py"
-
# Code in the kernel's language to write "hello, world" to stdout
code_hello_world = "print(\"hello, world\")"
@@ -120,4 +112,4 @@ class ToreePythonKernelTests(jupyter_kernel_test.KernelTests):
]
if __name__ == '__main__':
- unittest.main()
\ No newline at end of file
+ unittest.main()
[2/3] incubator-toree git commit: Splicing the Python and Java
kernels together
Posted by ch...@apache.org.
Splicing the Python and Java kernels together
Project: http://git-wip-us.apache.org/repos/asf/incubator-toree/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-toree/commit/9c5807c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-toree/tree/9c5807c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-toree/diff/9c5807c9
Branch: refs/heads/master
Commit: 9c5807c93d4ae27c32150a70df8efc981e921e17
Parents: a7275ed
Author: Liam Fisk <li...@xtra.co.nz>
Authored: Sun Jun 19 12:51:12 2016 +1200
Committer: Liam Fisk <li...@xtra.co.nz>
Committed: Sun Jun 19 14:24:08 2016 +1200
----------------------------------------------------------------------
.../src/main/resources/PySpark/pyspark_runner.py | 7 +++++++
1 file changed, 7 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/9c5807c9/pyspark-interpreter/src/main/resources/PySpark/pyspark_runner.py
----------------------------------------------------------------------
diff --git a/pyspark-interpreter/src/main/resources/PySpark/pyspark_runner.py b/pyspark-interpreter/src/main/resources/PySpark/pyspark_runner.py
index 3759165..c752457 100644
--- a/pyspark-interpreter/src/main/resources/PySpark/pyspark_runner.py
+++ b/pyspark-interpreter/src/main/resources/PySpark/pyspark_runner.py
@@ -94,6 +94,13 @@ class Kernel(object):
def __init__(self, jkernel):
self._jvm_kernel = jkernel
+ def __getattr__(self, name):
+ return self._jvm_kernel.__getattribute__(name)
+
+ def __dir__(self):
+ parent = super().__dir__()
+ return parent + [x for x in self._jvm_kernel.__dir__() if x not in parent]
+
def createSparkContext(self, config):
jconf = gateway.jvm.org.apache.spark.SparkConf(False)
for key,value in config.getAll():