You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2016/08/09 21:43:44 UTC
[36/52] [abbrv] incubator-predictionio git commit: Move pio_tests
files to testing/
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/3648ba1b/utils.py
----------------------------------------------------------------------
diff --git a/utils.py b/utils.py
deleted file mode 100644
index 629729e..0000000
--- a/utils.py
+++ /dev/null
@@ -1,309 +0,0 @@
-import re
-import time
-import os
-import requests
-import json
-from shutil import copyfile
-from subprocess import run, Popen, check_output
-from os.path import join as pjoin
-import pio_tests.globals as globals
-
-def srun(command):
- """ Runs a shell command given as a `str`
- Raises: `subprocess.CalledProcessError` when exit code != 0
- """
- return run(command, shell=True, stdout=globals.std_out(),
- stderr=globals.std_err(), check=True)
-
-def srun_out(command):
- """ Runs a shell command given as a `str`
- Returns: string with command's output
- Raises: `subprocess.CalledProcessError` when exit code != 0
- """
- return check_output(command, shell=True, universal_newlines=True,
- stderr=globals.std_err())
-
-def srun_bg(command):
- """ Runs a shell command given as a `str` in the background
- Returns: (obj: `subprocess.Popen`) for executed process
- """
- return Popen(command, shell=True, stdout=globals.std_out(),
- stderr=globals.std_err())
-
-def repository_dirname(template):
- """ Utility function getting repository name from the link
- Example: for "https://github.com/user/SomeRepo" should return "SomeRepo"
- """
- return template.split('/')[-1]
-
-def obtain_template(engine_dir, template):
- """Given a directory with engines and a template downloads an engine
- if neccessary
- Args:
- engine_dir (str): directory where engines are stored
- template (str): either the name of an engine from the engines directory
- or a link to repository with the engine
- Returns: str with the engine's path
- """
- if re.match('^https?:\/\/', template):
- dest_dir = pjoin(engine_dir, repository_dirname(template))
- if not os.path.exists(dest_dir):
- srun('git clone --depth=1 {0} {1}'.format(template, dest_dir))
- return dest_dir
- else:
- # check if exists
- dest_dir = pjoin(engine_dir, template)
- if not os.path.exists(dest_dir):
- raise ValueError('Engine {0} does not exist in {1}'
- .format(template, engine_dir))
-
- return dest_dir
-
-def pio_app_list():
- """Returns: a list of dicts for every application with the following keys:
- `name`, `id`, `access_key`, `allowed_events`
- """
- output = srun_out('pio app list').rstrip()
- return [ { 'name': line[2], 'id': int(line[4]),
- 'access_key': line[6], 'allowed_events': line[8] }
- for line in [x.split() for x in output.split('\n')[1:-1]] ]
-
-def get_app_eventserver_url_json(test_context):
- return 'http://{}:{}/events.json'.format(
- test_context.es_ip, test_context.es_port)
-
-def get_engine_url_json(engine_ip, engine_port):
- return 'http://{}:{}/queries.json'.format(
- engine_ip, engine_port)
-
-def send_event(event, test_context, access_key, channel=None):
- """ Sends an event to the eventserver
- Args:
- event: json-like dictionary describing an event
- test_context (obj: `TestContext`):
- access_key: application's access key
- channel (str): custom channel for storing event
- Returns: `requests.Response`
- """
- url = get_app_eventserver_url_json(test_context)
- params = { 'accessKey': access_key }
- if channel: params['channel'] = channel
- return requests.post(
- url,
- params=params,
- json=event)
-
-def send_events_batch(events, test_context, access_key, channel=None):
- """ Send events in batch via REST to the eventserver
- Args:
- events: a list of json-like dictionaries for events
- test_context (obj: `TestContext`):
- access_key: application's access key
- channel (str): custom channel for storing event
- Returns: `requests.Response`
- Requires: Events length must not exceed length of 50
- http://docs.prediction.io/datacollection/eventmodel/#3.-batch-events-to-the-eventserver
- """
- url = 'http://{}:{}/batch/events.json'.format(
- test_context.es_ip, test_context.es_port)
- params = { 'accessKey': access_key }
- if channel: params['channel'] = channel
- return requests.post(
- url,
- params=params,
- json=events)
-
-
-def import_events_batch(events, test_context, appid, channel=None):
- """ Imports events in batch from file with `pio import`
- Args:
- events: a list of json-like dictionaries for events
- test_context (obj: `TestContext`)
- appid (int): application's id
- channel (str): custom channel for storing event
- """
- # Writing events list to temporary file.
- # `pio import` requires each line of input file to be a JSON string
- # representing an event. Empty lines are not allowed.
- contents = ''
- for ev in events:
- contents += '{}\n'.format(json.dumps(ev))
- contents.rstrip('\n')
-
- file_path = pjoin(test_context.data_directory, 'events.json.tmp')
- try:
- with open(file_path, 'w') as f:
- f.write(contents)
- srun('pio import --appid {} --input {} {}'.format(
- appid,
- file_path,
- '--channel {}'.format(channel) if channel else ''))
- finally:
- os.remove(file_path)
-
-def get_events(test_context, access_key, params={}):
- """ Gets events for some application
- Args:
- test_context (obj: `TestContext`)
- access_key (str):
- params (dict): special parameters for eventserver's GET, e.g:
- 'limit', 'reversed', 'event'. See the docs
- Returns: `requests.Response`
- """
- url = get_app_eventserver_url_json(test_context)
- return requests.get(url, params=dict({'accessKey': access_key}, **params))
-
-def query_engine(data, engine_ip='localhost', engine_port=8000):
- """ Send a query to deployed engine
- Args:
- data (dict): json-like dictionary being an input to an engine
- access_key (str):
- engine_ip (str): ip of deployed engine
- engine_port (int): port of deployed engine
- Returns: `requests.Response`
- """
- url = get_engine_url_json(engine_ip, engine_port)
- return requests.post(url, json=data)
-
-class AppEngine:
- """ This is a utility class simplifying all app related interactions.
- Basically it is just a wrapper on other utility functions and shell
- scripts.
- """
-
- def __init__(self, test_context, app_context, already_created=False):
- """ Args:
- test_context (obj: `TestContext`)
- app_context (obj: `AppContext`)
- already_created (bool): True if the given app has been already added
- """
- self.test_context = test_context
- self.app_context = app_context
- self.engine_path = obtain_template(
- self.test_context.engine_directory, app_context.template)
- self.deployed_process = None
- if already_created:
- self.__init_info()
- else:
- self.id = None
- self.access_key = None
- self.description = None
-
- if self.app_context.engine_json_path:
- self.__copy_engine_json()
-
- def __copy_engine_json(self):
- to_path = pjoin(self.engine_path, 'engine.json')
- copyfile(self.app_context.engine_json_path, to_path)
-
- def __init_info(self):
- info = self.show()
- self.id = info['id']
- self.access_key = info['access_key']
- self.description = info['description']
-
- def new(self, id=None, description=None, access_key=None):
- """ Creates a new application with given parameters """
- srun('pio app new {} {} {} {}'.format(
- '--id {}'.format(id) if id else '',
- '--description \"{}\"'.format(description) if description else '',
- '--access-key {}'.format(access_key) if access_key else '',
- self.app_context.name))
-
- self.__init_info()
-
-
- def show(self):
- """ Returns: application info in dictionary with the keys:
- `name`: str, `id`: int, `description`: str,
- `access_key`: str, `allowed_events`: str
- """
- output = srun_out('pio app show {}'.format(self.app_context.name)).rstrip()
- lines = [x.split() for x in output.split('\n')]
- return { 'name': lines[0][3],
- 'id': int(lines[1][4]),
- 'description': lines[2][3] if len(lines[2]) >= 4 else '',
- 'access_key': lines[3][4],
- 'allowed_events': lines[3][5] }
-
-
- # deletes this app from pio
- def delete(self):
- srun('pio app delete {0} --force'.format(self.app_context.name))
-
- def build(self, sbt_extra=None, clean=False, no_asm=True):
- srun('cd {0}; pio build {1} {2} {3}'.format(
- self.engine_path,
- '--sbt-extra {}'.format(sbt_extra) if sbt_extra else '',
- '--clean' if clean else '',
- '--no-asm' if no_asm else ''))
-
- def train(self, batch=None, skip_sanity_check=False, stop_after_read=False,
- stop_after_prepare=False, engine_factory=None,
- engine_params_key=None, scratch_uri=None):
-
- srun('cd {}; pio train {} {} {} {} {} {} {}'.format(
- self.engine_path,
- '--batch {}'.format(batch) if batch else '',
- '--skip-sanity-check' if skip_sanity_check else '',
- '--stop-after-read' if stop_after_read else '',
- '--stop-after-prepare' if stop_after_prepare else '',
- '--engine_factory {}'.format(engine_factory) if engine_factory else '',
- '--engine-params-key {}'.format(engine_params_key) if engine_params_key else '',
- '--scratch-uri {}'.format(scratch_uri) if scratch_uri else ''))
-
- def deploy(self, wait_time=0, ip=None, port=None, engine_instance_id=None,
- feedback=False, accesskey=None, event_server_ip=None, event_server_port=None,
- batch=None, scratch_uri=None):
-
- command = 'cd {}; pio deploy {} {} {} {} {} {} {} {} {}'.format(
- self.engine_path,
- '--ip {}'.format(ip) if ip else '',
- '--port {}'.format(port) if port else '',
- '--engine-instance-id {}'.format(engine_instance_id) if engine_instance_id else '',
- '--feedback' if feedback else '',
- '--accesskey {}'.format(accesskey) if accesskey else '',
- '--event-server-ip {}'.format(event_server_ip) if event_server_ip else '',
- '--event-server-port {}'.format(event_server_port) if event_server_port else '',
- '--batch {}'.format(bach) if batch else '',
- '--scratch-uri {}'.format(scratch_uri) if scratch_uri else '')
-
- self.deployed_process = srun_bg(command)
- time.sleep(wait_time)
- if self.deployed_process.poll() is not None:
- raise Exception('Application engine terminated')
- self.ip = ip if ip else 'localhost'
- self.port = port if port else 8000
-
- def stop(self):
- """ Kills deployed engine """
- if self.deployed_process:
- self.deployed_process.kill()
-
- def new_channel(self, channel):
- srun('pio app channel-new {0}'.format(channel))
-
- def delete_channel(self, channel):
- srun('pio app channel-delete {0} --force'.format(channel))
-
- def send_event(self, event):
- return send_event(event, self.test_context, self.access_key)
-
- def send_events_batch(self, events):
- return send_events_batch(events, self.test_context, self.access_key)
-
- def import_events_batch(self, events):
- return import_events_batch(events, self.test_context, self.id)
-
- def get_events(self, params={}):
- return get_events(self.test_context, self.access_key, params)
-
- def delete_data(self, delete_all=True, channel=None):
- srun('pio app data-delete {0} {1} {2} --force'
- .format(
- self.app_context.name,
- '--all' if delete_all else '',
- '--channel ' + channel if channel is not None else ''))
-
- def query(self, data):
- return query_engine(data, self.ip, self.port)