You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by fg...@apache.org on 2018/07/17 18:07:18 UTC
[incubator-sdap-nexus] branch master updated: SDAP-124 Add ability
to submit custom code for execution. (#23)
This is an automated email from the ASF dual-hosted git repository.
fgreg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
The following commit(s) were added to refs/heads/master by this push:
new 24cf540 SDAP-124 Add ability to submit custom code for execution. (#23)
24cf540 is described below
commit 24cf5400c7973673037b86ae83b73566286d11b2
Author: Joseph Jacob <jj...@users.noreply.github.com>
AuthorDate: Tue Jul 17 11:07:15 2018 -0700
SDAP-124 Add ability to submit custom code for execution. (#23)
* SDAP-123 Allow Jupyter image to configure nexus repo at build time.
* SDAP-124 Add ability to submit custom code for execution.
* SDAP-124 Add ability to submit custom code for execution.
* Get livy server host from configuration.
* Removed jupyter-livy imaage.
* Set livy_host to localhost instead of 127.0.0.1
---
analysis/webservice/NexusLivyHandler.py | 117 ++++++++++++++++++++++++++++++++
analysis/webservice/config/web.ini | 6 ++
analysis/webservice/webapp_livy.py | 101 +++++++++++++++++++++++++++
client/nexuscli/__init__.py | 3 +
client/nexuscli/nexuscli_ow.py | 77 +++++++++++++++++++++
5 files changed, 304 insertions(+)
diff --git a/analysis/webservice/NexusLivyHandler.py b/analysis/webservice/NexusLivyHandler.py
new file mode 100644
index 0000000..3b6a9b0
--- /dev/null
+++ b/analysis/webservice/NexusLivyHandler.py
@@ -0,0 +1,117 @@
+import time, json, requests, textwrap
+from importlib import import_module
+from os import environ
+from os.path import basename, splitext, abspath
+from livy.client import HttpClient
+
+class LivyHandler:
+
+ def __init__(self, host='http://localhost:8998'):
+ self._headers = {'Content-Type': 'application/json'}
+ if host is not None:
+ self.create_pyspark_session(host)
+
+ def _wait_for_state(self, url, desired_state):
+ while True:
+ r = requests.get(url, headers=self._headers)
+ if r.json()['state'] == desired_state:
+ break
+ time.sleep(1)
+ return r
+
+ def create_pyspark_session(self, host):
+ self._host = host
+ data = {'kind': 'pyspark'}
+
+ # Create a Spark session
+ print 'Creating Spark session...'
+ r = requests.post(host + '/sessions', data=json.dumps(data),
+ headers=self._headers)
+
+ # Wait until the new Spark session is ready to use
+ self._session_url = host + r.headers['location']
+
+ r = self._wait_for_state(self._session_url, 'idle')
+
+ # Create client for Livy batch jobs
+ self._lc = HttpClient(self._session_url)
+
+ def exec_str (self, code):
+ print 'Submitting code...'
+ statements_url = self._session_url + '/statements'
+ data = {'code': code}
+ r = requests.post(statements_url, data=json.dumps(data),
+ headers=self._headers)
+
+ # Wait until the code completes
+ print 'Running code...'
+ status_url = self._host + r.headers['location']
+
+ r = self._wait_for_state(status_url, 'available')
+ output = r.json()['output']
+ print 'output=',output
+ if output['status'] == 'error':
+ ans = {u'text/plain': output['traceback']}
+ else:
+ ans = {u'text/plain': [output['data']['text/plain']]}
+ return ans
+
+ def exec_file(self, py_uri):
+ py_uri_abs = abspath(py_uri)
+ self._lc.upload_pyfile(py_uri_abs)
+ m = splitext(basename(py_uri_abs))[0]
+ try:
+ m_imp = import_module(m)
+ except ImportError:
+ raise
+
+ def upload_pyfile_job(jc):
+ return m_imp.main(jc.sc)
+
+ return self._lc.submit(upload_pyfile_job).result()
+
+ def close(self):
+ print 'Closing Spark session...'
+ requests.delete(self._session_url, headers=self._headers)
+
+
+def main():
+ try:
+ livy_host = environ['LIVY_HOST']
+ except:
+ livy_host = "http://localhost:8998"
+ print 'Using Livy at {}'.format(livy_host)
+ lh = LivyHandler(host=livy_host)
+
+ # Run some pyspark code.
+ code = textwrap.dedent("""
+ 1 + 1
+ """)
+ ans = lh.exec_str(code)
+ print 'The answer is {}'.format(ans)
+
+ # Run some more pyspark code.
+ code = textwrap.dedent("""
+ import random
+ NUM_SAMPLES = 100000
+ def sample(p):
+ x, y = random.random(), random.random()
+ return 1 if x*x + y*y < 1 else 0
+
+ count = sc.parallelize(xrange(0, NUM_SAMPLES)).map(sample).reduce(lambda a, b: a + b)
+ print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)
+ """)
+ ans = lh.exec_str(code)
+ print 'The answer is {}'.format(ans)
+
+ # Run a batch job
+ py_uri = 'test_code_nexus_laptop.py'
+ print 'Submitting batch job from {}'.format(py_uri)
+ ans = lh.exec_file(py_uri)
+ print 'The answer is {}'.format(ans)
+
+ # Close the Spark session.
+ lh.close()
+
+if __name__ == "__main__":
+ main()
diff --git a/analysis/webservice/config/web.ini b/analysis/webservice/config/web.ini
index 77f77ce..a1ecb2c 100644
--- a/analysis/webservice/config/web.ini
+++ b/analysis/webservice/config/web.ini
@@ -3,6 +3,12 @@ server.socket_port=8083
server.socket_host = '127.0.0.1'
server.max_simultaneous_requests = 10
+[livy]
+livy_port = 8998
+livy_host = localhost
+server.socket_port = 8084
+server.socket_host = '127.0.0.1'
+
[static]
static_enabled=true
static_dir=static
diff --git a/analysis/webservice/webapp_livy.py b/analysis/webservice/webapp_livy.py
new file mode 100644
index 0000000..ed8a638
--- /dev/null
+++ b/analysis/webservice/webapp_livy.py
@@ -0,0 +1,101 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import ConfigParser
+import logging
+import sys
+import os
+import pkg_resources
+import tornado.web
+from tornado.options import define, options, parse_command_line
+from webservice.NexusLivyHandler import LivyHandler
+
+class RunFileHandler(tornado.web.RequestHandler):
+
+ _id = 0
+
+ def __init__(self, *args, **kwargs):
+ self._lh = kwargs.pop('lh', None)
+ super(RunFileHandler, self).__init__(*args, **kwargs)
+
+ def post(self):
+ self._upload_file = self.request.files['file'][0]
+ upload_fname = 'upload_'+str(RunFileHandler._id)+'.py'
+ while os.path.exists(upload_fname):
+ RunFileHandler._id += 1
+ upload_fname = 'upload_'+str(RunFileHandler._id)+'.py'
+ RunFileHandler._id += 1
+ with open(upload_fname, 'w') as f:
+ f.write(self._upload_file['body'])
+ try:
+ ans = self._lh.exec_file(upload_fname)
+ except Exception, e:
+ ans = str(e)
+ self.write(str(ans))
+
+
+class RunStrHandler(tornado.web.RequestHandler):
+
+ def __init__(self, *args, **kwargs):
+ self._lh = kwargs.pop('lh', None)
+ super(RunStrHandler, self).__init__(*args, **kwargs)
+
+ def post(self):
+ self._upload_str = self.request.body
+ ans = self._lh.exec_str(self._upload_str)
+ self.write(str(ans))
+
+
+if __name__ == "__main__":
+
+ # Configure logger.
+ logging.basicConfig(
+ level=logging.DEBUG,
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+ datefmt="%Y-%m-%dT%H:%M:%S", stream=sys.stdout)
+ log = logging.getLogger(__name__)
+
+ # Configure tornado.
+ webconfig = ConfigParser.RawConfigParser()
+ webconfig.readfp(pkg_resources.resource_stream(__name__, "config/web.ini"), filename='web.ini')
+ define("debug", default=False, help="run in debug mode")
+ define("port", default=webconfig.get("livy", "server.socket_port"), help="run on the given port", type=int)
+ define("address", default=webconfig.get("livy", "server.socket_host"), help="Bind to the given address")
+ parse_command_line()
+ log.info("Initializing on host address '%s'" % options.address)
+ log.info("Initializing on port '%s'" % options.port)
+ log.info("Starting web server in debug mode: %s" % options.debug)
+
+ # Start up Livy Spark session.
+ livy_host = webconfig.get("livy", "livy_host")
+ livy_port = webconfig.get("livy", "livy_port")
+ livy_url = 'http://' + livy_host + ':' + livy_port
+ lh = LivyHandler(host=livy_url)
+
+ # Define tornado job handlers
+ handlers = []
+ handlers.append((r"/run_file", RunFileHandler, dict(lh=lh)))
+ handlers.append((r"/run_str", RunStrHandler, dict(lh=lh)))
+
+ # Start listening for job requests.
+ app = tornado.web.Application(
+ handlers,
+ default_host=options.address,
+ debug=options.debug
+ )
+ app.listen(options.port)
+ log.info("Started HTTP listener...")
+ tornado.ioloop.IOLoop.current().start()
diff --git a/client/nexuscli/__init__.py b/client/nexuscli/__init__.py
index 250b474..d9162fe 100644
--- a/client/nexuscli/__init__.py
+++ b/client/nexuscli/__init__.py
@@ -19,3 +19,6 @@ from nexuscli.nexuscli import time_series
from nexuscli.nexuscli import dataset_list
from nexuscli.nexuscli import daily_difference_average
from nexuscli.nexuscli import subset
+from nexuscli.nexuscli_ow import set_target
+from nexuscli.nexuscli_ow import run_file
+from nexuscli.nexuscli_ow import run_str
diff --git a/client/nexuscli/nexuscli_ow.py b/client/nexuscli/nexuscli_ow.py
new file mode 100644
index 0000000..feae3e2
--- /dev/null
+++ b/client/nexuscli/nexuscli_ow.py
@@ -0,0 +1,77 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+This module provides a native python client interface to the NEXUS Livy
+webservice API.
+
+Usage:
+
+ from nexuscli import nexuscli_ow
+
+ nexuscli_ow.set_target("http://host:port")
+
+ filename = "mycode.py"
+ nexuscli_ow.run_file(filename)
+
+ The code in the file passed to run_file must be valid pyspark code.
+ Furthermore, it must have a main function that takes exactly one
+ argument, the SparkContext. The code can make use of that SparkContext
+ variable, but should not create the SparkContext.
+
+ code = "1+1"
+ nexuscli_ow.run_str(code)
+
+ The code passed to run_str can also be a multi-line string containing
+ valid python code. It can also be a multi-line string containing
+ valid pyspark code. For pyspark code the variable sc may be used to
+ access the SparkContext, but it should not create the SparkContext.
+"""
+import requests
+import ast
+
+target = 'http://localhost:8084'
+
+session = requests.session()
+
+
+def set_target(url, use_session=True):
+ """
+ Set the URL for the NEXUS webapp endpoint.
+
+ __url__ URL for NEXUS webapp endpoint
+ __return__ None
+ """
+ global target
+ target = url
+
+ if not use_session:
+ global session
+ session = requests
+
+
+def run_file(fname):
+ files = {'file': open(fname, 'rb')}
+ response = session.post(target+'/run_file', files=files)
+ print(response.text)
+ return response.text
+
+
+def run_str(code):
+ response = requests.post(target+'/run_str', data=code)
+ ans = ast.literal_eval(response.text)['text/plain']
+ for line in ans:
+ print(line, end=" ")
+ return ans
+