You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by fg...@apache.org on 2018/07/17 18:07:18 UTC

[incubator-sdap-nexus] branch master updated: SDAP-124 Add ability to submit custom code for execution. (#23)

This is an automated email from the ASF dual-hosted git repository.

fgreg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git


The following commit(s) were added to refs/heads/master by this push:
     new 24cf540  SDAP-124 Add ability to submit custom code for execution. (#23)
24cf540 is described below

commit 24cf5400c7973673037b86ae83b73566286d11b2
Author: Joseph Jacob <jj...@users.noreply.github.com>
AuthorDate: Tue Jul 17 11:07:15 2018 -0700

    SDAP-124 Add ability to submit custom code for execution. (#23)
    
    * SDAP-123 Allow Jupyter image to configure nexus repo at build time.
    
    * SDAP-124 Add ability to submit custom code for execution.
    
    * SDAP-124 Add ability to submit custom code for execution.
    
    * Get livy server host from configuration.
    
    * Removed jupyter-livy imaage.
    
    * Set livy_host to localhost instead of 127.0.0.1
---
 analysis/webservice/NexusLivyHandler.py | 117 ++++++++++++++++++++++++++++++++
 analysis/webservice/config/web.ini      |   6 ++
 analysis/webservice/webapp_livy.py      | 101 +++++++++++++++++++++++++++
 client/nexuscli/__init__.py             |   3 +
 client/nexuscli/nexuscli_ow.py          |  77 +++++++++++++++++++++
 5 files changed, 304 insertions(+)

diff --git a/analysis/webservice/NexusLivyHandler.py b/analysis/webservice/NexusLivyHandler.py
new file mode 100644
index 0000000..3b6a9b0
--- /dev/null
+++ b/analysis/webservice/NexusLivyHandler.py
@@ -0,0 +1,117 @@
+import time, json, requests, textwrap
+from importlib import import_module
+from os import environ
+from os.path import basename, splitext, abspath
+from livy.client import HttpClient
+
+class LivyHandler:
+
+    def __init__(self, host='http://localhost:8998'):
+        self._headers = {'Content-Type': 'application/json'}
+        if host is not None:
+            self.create_pyspark_session(host)
+
+    def _wait_for_state(self, url, desired_state):
+        while True:
+            r = requests.get(url, headers=self._headers)
+            if r.json()['state'] == desired_state:
+                break
+            time.sleep(1)
+        return r
+
+    def create_pyspark_session(self, host):
+        self._host = host
+        data = {'kind': 'pyspark'}
+
+        # Create a Spark session
+        print 'Creating Spark session...'
+        r = requests.post(host + '/sessions', data=json.dumps(data), 
+                          headers=self._headers)
+
+        # Wait until the new Spark session is ready to use
+        self._session_url = host + r.headers['location']
+
+        r = self._wait_for_state(self._session_url, 'idle')
+
+        # Create client for Livy batch jobs
+        self._lc = HttpClient(self._session_url)
+
+    def exec_str (self, code):
+        print 'Submitting code...'
+        statements_url = self._session_url + '/statements'
+        data = {'code': code}
+        r = requests.post(statements_url, data=json.dumps(data), 
+                          headers=self._headers)
+        
+        # Wait until the code completes
+        print 'Running code...'
+        status_url = self._host + r.headers['location']
+    
+        r = self._wait_for_state(status_url, 'available')
+        output = r.json()['output']
+        print 'output=',output
+        if output['status'] == 'error':
+            ans = {u'text/plain': output['traceback']}
+        else:
+            ans = {u'text/plain': [output['data']['text/plain']]}
+        return ans
+
+    def exec_file(self, py_uri):
+        py_uri_abs = abspath(py_uri)
+        self._lc.upload_pyfile(py_uri_abs)
+        m = splitext(basename(py_uri_abs))[0]
+        try:
+            m_imp = import_module(m)
+        except ImportError:
+            raise
+
+        def upload_pyfile_job(jc):
+            return m_imp.main(jc.sc)
+
+        return self._lc.submit(upload_pyfile_job).result()
+
+    def close(self):
+        print 'Closing Spark session...'
+        requests.delete(self._session_url, headers=self._headers)
+
+
+def main():
+    try:
+        livy_host = environ['LIVY_HOST']
+    except:
+        livy_host = "http://localhost:8998"
+    print 'Using Livy at {}'.format(livy_host)
+    lh = LivyHandler(host=livy_host)
+
+    # Run some pyspark code.
+    code = textwrap.dedent("""
+    1 + 1
+    """)
+    ans = lh.exec_str(code)
+    print 'The answer is {}'.format(ans)
+
+    # Run some more pyspark code.
+    code = textwrap.dedent("""
+    import random
+    NUM_SAMPLES = 100000
+    def sample(p):
+      x, y = random.random(), random.random()
+      return 1 if x*x + y*y < 1 else 0
+
+    count = sc.parallelize(xrange(0, NUM_SAMPLES)).map(sample).reduce(lambda a, b: a + b)
+    print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)
+    """)
+    ans = lh.exec_str(code)
+    print 'The answer is {}'.format(ans)
+    
+    # Run a batch job
+    py_uri = 'test_code_nexus_laptop.py'
+    print 'Submitting batch job from {}'.format(py_uri)
+    ans = lh.exec_file(py_uri)
+    print 'The answer is {}'.format(ans)
+
+    # Close the Spark session.
+    lh.close()
+
+if __name__ == "__main__":
+    main()
diff --git a/analysis/webservice/config/web.ini b/analysis/webservice/config/web.ini
index 77f77ce..a1ecb2c 100644
--- a/analysis/webservice/config/web.ini
+++ b/analysis/webservice/config/web.ini
@@ -3,6 +3,12 @@ server.socket_port=8083
 server.socket_host = '127.0.0.1'
 server.max_simultaneous_requests = 10
 
+[livy]
+livy_port = 8998
+livy_host = localhost
+server.socket_port = 8084
+server.socket_host = '127.0.0.1'
+
 [static]
 static_enabled=true
 static_dir=static
diff --git a/analysis/webservice/webapp_livy.py b/analysis/webservice/webapp_livy.py
new file mode 100644
index 0000000..ed8a638
--- /dev/null
+++ b/analysis/webservice/webapp_livy.py
@@ -0,0 +1,101 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import ConfigParser
+import logging
+import sys
+import os
+import pkg_resources
+import tornado.web
+from tornado.options import define, options, parse_command_line
+from webservice.NexusLivyHandler import LivyHandler
+
+class RunFileHandler(tornado.web.RequestHandler):
+
+    _id = 0
+            
+    def __init__(self, *args, **kwargs):
+        self._lh = kwargs.pop('lh', None)
+        super(RunFileHandler, self).__init__(*args, **kwargs)
+
+    def post(self):
+        self._upload_file = self.request.files['file'][0]
+        upload_fname = 'upload_'+str(RunFileHandler._id)+'.py'
+        while os.path.exists(upload_fname):
+            RunFileHandler._id += 1
+            upload_fname = 'upload_'+str(RunFileHandler._id)+'.py'
+        RunFileHandler._id += 1
+        with open(upload_fname, 'w') as f:
+            f.write(self._upload_file['body'])
+        try:
+            ans = self._lh.exec_file(upload_fname)
+        except Exception, e:
+            ans = str(e)
+        self.write(str(ans))
+
+
+class RunStrHandler(tornado.web.RequestHandler):
+            
+    def __init__(self, *args, **kwargs):
+        self._lh = kwargs.pop('lh', None)
+        super(RunStrHandler, self).__init__(*args, **kwargs)
+
+    def post(self):
+        self._upload_str = self.request.body
+        ans = self._lh.exec_str(self._upload_str)
+        self.write(str(ans))
+
+
+if __name__ == "__main__":
+
+    # Configure logger.
+    logging.basicConfig(
+        level=logging.DEBUG,
+        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+        datefmt="%Y-%m-%dT%H:%M:%S", stream=sys.stdout)
+    log = logging.getLogger(__name__)
+
+    # Configure tornado.
+    webconfig = ConfigParser.RawConfigParser()
+    webconfig.readfp(pkg_resources.resource_stream(__name__, "config/web.ini"), filename='web.ini')
+    define("debug", default=False, help="run in debug mode")
+    define("port", default=webconfig.get("livy", "server.socket_port"), help="run on the given port", type=int)
+    define("address", default=webconfig.get("livy", "server.socket_host"), help="Bind to the given address")
+    parse_command_line()
+    log.info("Initializing on host address '%s'" % options.address)
+    log.info("Initializing on port '%s'" % options.port)
+    log.info("Starting web server in debug mode: %s" % options.debug)
+
+    # Start up Livy Spark session.
+    livy_host = webconfig.get("livy", "livy_host")
+    livy_port = webconfig.get("livy", "livy_port")
+    livy_url = 'http://' + livy_host + ':' + livy_port
+    lh = LivyHandler(host=livy_url)
+
+    # Define tornado job handlers
+    handlers = []
+    handlers.append((r"/run_file", RunFileHandler, dict(lh=lh)))
+    handlers.append((r"/run_str", RunStrHandler, dict(lh=lh)))
+
+    # Start listening for job requests.
+    app = tornado.web.Application(
+        handlers,
+        default_host=options.address,
+        debug=options.debug
+    )
+    app.listen(options.port)
+    log.info("Started HTTP listener...")
+    tornado.ioloop.IOLoop.current().start()
diff --git a/client/nexuscli/__init__.py b/client/nexuscli/__init__.py
index 250b474..d9162fe 100644
--- a/client/nexuscli/__init__.py
+++ b/client/nexuscli/__init__.py
@@ -19,3 +19,6 @@ from nexuscli.nexuscli import time_series
 from nexuscli.nexuscli import dataset_list
 from nexuscli.nexuscli import daily_difference_average
 from nexuscli.nexuscli import subset
+from nexuscli.nexuscli_ow import set_target
+from nexuscli.nexuscli_ow import run_file
+from nexuscli.nexuscli_ow import run_str
diff --git a/client/nexuscli/nexuscli_ow.py b/client/nexuscli/nexuscli_ow.py
new file mode 100644
index 0000000..feae3e2
--- /dev/null
+++ b/client/nexuscli/nexuscli_ow.py
@@ -0,0 +1,77 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+This module provides a native python client interface to the NEXUS Livy
+webservice API.
+
+Usage:
+
+    from nexuscli import nexuscli_ow
+    
+    nexuscli_ow.set_target("http://host:port")
+
+    filename = "mycode.py"
+    nexuscli_ow.run_file(filename)
+
+    The code in the file passed to run_file must be valid pyspark code.
+    Furthermore, it must have a main function that takes exactly one
+    argument, the SparkContext.  The code can make use of that SparkContext
+    variable, but should not create the SparkContext.
+
+    code = "1+1"
+    nexuscli_ow.run_str(code)
+
+    The code passed to run_str can also be a multi-line string containing 
+    valid python code.  It can also be a multi-line string containing 
+    valid pyspark code.  For pyspark code the variable sc may be used to 
+    access the SparkContext,  but it should not create the SparkContext.
+"""
+import requests
+import ast
+
+target = 'http://localhost:8084'
+
+session = requests.session()
+
+
+def set_target(url, use_session=True):
+    """
+    Set the URL for the NEXUS webapp endpoint.  
+    
+    __url__ URL for NEXUS webapp endpoint   
+    __return__ None
+    """
+    global target
+    target = url
+
+    if not use_session:
+        global session
+        session = requests
+
+
+def run_file(fname):
+    files = {'file': open(fname, 'rb')}
+    response = session.post(target+'/run_file', files=files)
+    print(response.text)
+    return response.text
+    
+
+def run_str(code):
+    response = requests.post(target+'/run_str', data=code)
+    ans = ast.literal_eval(response.text)['text/plain']
+    for line in ans:
+        print(line, end=" ")
+    return ans
+