You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@sdap.apache.org by GitBox <gi...@apache.org> on 2018/07/17 18:07:17 UTC

[GitHub] fgreg closed pull request #23: SDAP-124 Add ability to submit custom code for execution.

fgreg closed pull request #23: SDAP-124 Add ability to submit custom code for execution.
URL: https://github.com/apache/incubator-sdap-nexus/pull/23
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/analysis/webservice/NexusLivyHandler.py b/analysis/webservice/NexusLivyHandler.py
new file mode 100644
index 0000000..3b6a9b0
--- /dev/null
+++ b/analysis/webservice/NexusLivyHandler.py
@@ -0,0 +1,117 @@
+import time, json, requests, textwrap
+from importlib import import_module
+from os import environ
+from os.path import basename, splitext, abspath
+from livy.client import HttpClient
+
+class LivyHandler:
+
+    def __init__(self, host='http://localhost:8998'):
+        self._headers = {'Content-Type': 'application/json'}
+        if host is not None:
+            self.create_pyspark_session(host)
+
+    def _wait_for_state(self, url, desired_state):
+        while True:
+            r = requests.get(url, headers=self._headers)
+            if r.json()['state'] == desired_state:
+                break
+            time.sleep(1)
+        return r
+
+    def create_pyspark_session(self, host):
+        self._host = host
+        data = {'kind': 'pyspark'}
+
+        # Create a Spark session
+        print 'Creating Spark session...'
+        r = requests.post(host + '/sessions', data=json.dumps(data), 
+                          headers=self._headers)
+
+        # Wait until the new Spark session is ready to use
+        self._session_url = host + r.headers['location']
+
+        r = self._wait_for_state(self._session_url, 'idle')
+
+        # Create client for Livy batch jobs
+        self._lc = HttpClient(self._session_url)
+
+    def exec_str (self, code):
+        print 'Submitting code...'
+        statements_url = self._session_url + '/statements'
+        data = {'code': code}
+        r = requests.post(statements_url, data=json.dumps(data), 
+                          headers=self._headers)
+        
+        # Wait until the code completes
+        print 'Running code...'
+        status_url = self._host + r.headers['location']
+    
+        r = self._wait_for_state(status_url, 'available')
+        output = r.json()['output']
+        print 'output=',output
+        if output['status'] == 'error':
+            ans = {u'text/plain': output['traceback']}
+        else:
+            ans = {u'text/plain': [output['data']['text/plain']]}
+        return ans
+
+    def exec_file(self, py_uri):
+        py_uri_abs = abspath(py_uri)
+        self._lc.upload_pyfile(py_uri_abs)
+        m = splitext(basename(py_uri_abs))[0]
+        try:
+            m_imp = import_module(m)
+        except ImportError:
+            raise
+
+        def upload_pyfile_job(jc):
+            return m_imp.main(jc.sc)
+
+        return self._lc.submit(upload_pyfile_job).result()
+
+    def close(self):
+        print 'Closing Spark session...'
+        requests.delete(self._session_url, headers=self._headers)
+
+
+def main():
+    try:
+        livy_host = environ['LIVY_HOST']
+    except:
+        livy_host = "http://localhost:8998"
+    print 'Using Livy at {}'.format(livy_host)
+    lh = LivyHandler(host=livy_host)
+
+    # Run some pyspark code.
+    code = textwrap.dedent("""
+    1 + 1
+    """)
+    ans = lh.exec_str(code)
+    print 'The answer is {}'.format(ans)
+
+    # Run some more pyspark code.
+    code = textwrap.dedent("""
+    import random
+    NUM_SAMPLES = 100000
+    def sample(p):
+      x, y = random.random(), random.random()
+      return 1 if x*x + y*y < 1 else 0
+
+    count = sc.parallelize(xrange(0, NUM_SAMPLES)).map(sample).reduce(lambda a, b: a + b)
+    print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)
+    """)
+    ans = lh.exec_str(code)
+    print 'The answer is {}'.format(ans)
+    
+    # Run a batch job
+    py_uri = 'test_code_nexus_laptop.py'
+    print 'Submitting batch job from {}'.format(py_uri)
+    ans = lh.exec_file(py_uri)
+    print 'The answer is {}'.format(ans)
+
+    # Close the Spark session.
+    lh.close()
+
+if __name__ == "__main__":
+    main()
diff --git a/analysis/webservice/config/web.ini b/analysis/webservice/config/web.ini
index 77f77ce..a1ecb2c 100644
--- a/analysis/webservice/config/web.ini
+++ b/analysis/webservice/config/web.ini
@@ -3,6 +3,12 @@ server.socket_port=8083
 server.socket_host = '127.0.0.1'
 server.max_simultaneous_requests = 10
 
+[livy]
+livy_port = 8998
+livy_host = localhost
+server.socket_port = 8084
+server.socket_host = '127.0.0.1'
+
 [static]
 static_enabled=true
 static_dir=static
diff --git a/analysis/webservice/webapp_livy.py b/analysis/webservice/webapp_livy.py
new file mode 100644
index 0000000..ed8a638
--- /dev/null
+++ b/analysis/webservice/webapp_livy.py
@@ -0,0 +1,101 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import ConfigParser
+import logging
+import sys
+import os
+import pkg_resources
+import tornado.web
+from tornado.options import define, options, parse_command_line
+from webservice.NexusLivyHandler import LivyHandler
+
+class RunFileHandler(tornado.web.RequestHandler):
+
+    _id = 0
+            
+    def __init__(self, *args, **kwargs):
+        self._lh = kwargs.pop('lh', None)
+        super(RunFileHandler, self).__init__(*args, **kwargs)
+
+    def post(self):
+        self._upload_file = self.request.files['file'][0]
+        upload_fname = 'upload_'+str(RunFileHandler._id)+'.py'
+        while os.path.exists(upload_fname):
+            RunFileHandler._id += 1
+            upload_fname = 'upload_'+str(RunFileHandler._id)+'.py'
+        RunFileHandler._id += 1
+        with open(upload_fname, 'w') as f:
+            f.write(self._upload_file['body'])
+        try:
+            ans = self._lh.exec_file(upload_fname)
+        except Exception, e:
+            ans = str(e)
+        self.write(str(ans))
+
+
+class RunStrHandler(tornado.web.RequestHandler):
+            
+    def __init__(self, *args, **kwargs):
+        self._lh = kwargs.pop('lh', None)
+        super(RunStrHandler, self).__init__(*args, **kwargs)
+
+    def post(self):
+        self._upload_str = self.request.body
+        ans = self._lh.exec_str(self._upload_str)
+        self.write(str(ans))
+
+
+if __name__ == "__main__":
+
+    # Configure logger.
+    logging.basicConfig(
+        level=logging.DEBUG,
+        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+        datefmt="%Y-%m-%dT%H:%M:%S", stream=sys.stdout)
+    log = logging.getLogger(__name__)
+
+    # Configure tornado.
+    webconfig = ConfigParser.RawConfigParser()
+    webconfig.readfp(pkg_resources.resource_stream(__name__, "config/web.ini"), filename='web.ini')
+    define("debug", default=False, help="run in debug mode")
+    define("port", default=webconfig.get("livy", "server.socket_port"), help="run on the given port", type=int)
+    define("address", default=webconfig.get("livy", "server.socket_host"), help="Bind to the given address")
+    parse_command_line()
+    log.info("Initializing on host address '%s'" % options.address)
+    log.info("Initializing on port '%s'" % options.port)
+    log.info("Starting web server in debug mode: %s" % options.debug)
+
+    # Start up Livy Spark session.
+    livy_host = webconfig.get("livy", "livy_host")
+    livy_port = webconfig.get("livy", "livy_port")
+    livy_url = 'http://' + livy_host + ':' + livy_port
+    lh = LivyHandler(host=livy_url)
+
+    # Define tornado job handlers
+    handlers = []
+    handlers.append((r"/run_file", RunFileHandler, dict(lh=lh)))
+    handlers.append((r"/run_str", RunStrHandler, dict(lh=lh)))
+
+    # Start listening for job requests.
+    app = tornado.web.Application(
+        handlers,
+        default_host=options.address,
+        debug=options.debug
+    )
+    app.listen(options.port)
+    log.info("Started HTTP listener...")
+    tornado.ioloop.IOLoop.current().start()
diff --git a/client/nexuscli/__init__.py b/client/nexuscli/__init__.py
index 250b474..d9162fe 100644
--- a/client/nexuscli/__init__.py
+++ b/client/nexuscli/__init__.py
@@ -19,3 +19,6 @@
 from nexuscli.nexuscli import dataset_list
 from nexuscli.nexuscli import daily_difference_average
 from nexuscli.nexuscli import subset
+from nexuscli.nexuscli_ow import set_target
+from nexuscli.nexuscli_ow import run_file
+from nexuscli.nexuscli_ow import run_str
diff --git a/client/nexuscli/nexuscli_ow.py b/client/nexuscli/nexuscli_ow.py
new file mode 100644
index 0000000..feae3e2
--- /dev/null
+++ b/client/nexuscli/nexuscli_ow.py
@@ -0,0 +1,77 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+This module provides a native python client interface to the NEXUS Livy
+webservice API.
+
+Usage:
+
+    from nexuscli import nexuscli_ow
+    
+    nexuscli_ow.set_target("http://host:port")
+
+    filename = "mycode.py"
+    nexuscli_ow.run_file(filename)
+
+    The code in the file passed to run_file must be valid pyspark code.
+    Furthermore, it must have a main function that takes exactly one
+    argument, the SparkContext.  The code can make use of that SparkContext
+    variable, but should not create the SparkContext.
+
+    code = "1+1"
+    nexuscli_ow.run_str(code)
+
+    The code passed to run_str can also be a multi-line string containing 
+    valid python code.  It can also be a multi-line string containing 
+    valid pyspark code.  For pyspark code the variable sc may be used to 
+    access the SparkContext,  but it should not create the SparkContext.
+"""
+import requests
+import ast
+
+target = 'http://localhost:8084'
+
+session = requests.session()
+
+
+def set_target(url, use_session=True):
+    """
+    Set the URL for the NEXUS webapp endpoint.  
+    
+    __url__ URL for NEXUS webapp endpoint   
+    __return__ None
+    """
+    global target
+    target = url
+
+    if not use_session:
+        global session
+        session = requests
+
+
+def run_file(fname):
+    files = {'file': open(fname, 'rb')}
+    response = session.post(target+'/run_file', files=files)
+    print(response.text)
+    return response.text
+    
+
+def run_str(code):
+    response = requests.post(target+'/run_str', data=code)
+    ans = ast.literal_eval(response.text)['text/plain']
+    for line in ans:
+        print(line, end=" ")
+    return ans
+


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services