You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2016/08/09 21:43:44 UTC

[36/52] [abbrv] incubator-predictionio git commit: Move pio_tests files to testing/

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/3648ba1b/utils.py
----------------------------------------------------------------------
diff --git a/utils.py b/utils.py
deleted file mode 100644
index 629729e..0000000
--- a/utils.py
+++ /dev/null
@@ -1,309 +0,0 @@
-import re
-import time
-import os
-import requests
-import json
-from shutil import copyfile
-from subprocess import run, Popen, check_output
-from os.path import join as pjoin
-import pio_tests.globals as globals
-
-def srun(command):
-  """ Runs a shell command given as a `str`
-  Raises: `subprocess.CalledProcessError` when exit code != 0
-  """
-  return run(command, shell=True, stdout=globals.std_out(),
-      stderr=globals.std_err(), check=True)
-
-def srun_out(command):
-  """ Runs a shell command given as a `str`
-  Returns: string with command's output
-  Raises: `subprocess.CalledProcessError` when exit code != 0
-  """
-  return check_output(command, shell=True, universal_newlines=True,
-      stderr=globals.std_err())
-
-def srun_bg(command):
-  """ Runs a shell command given as a `str` in the background
-  Returns: (obj: `subprocess.Popen`) for executed process
-  """
-  return Popen(command, shell=True, stdout=globals.std_out(),
-      stderr=globals.std_err())
-
-def repository_dirname(template):
-  """ Utility function getting repository name from the link
-  Example: for "https://github.com/user/SomeRepo" should return "SomeRepo"
-  """
-  return template.split('/')[-1]
-
-def obtain_template(engine_dir, template):
-  """Given a directory with engines and a template downloads an engine
-  if neccessary
-  Args:
-    engine_dir (str): directory where engines are stored
-    template (str): either the name of an engine from the engines directory
-        or a link to repository with the engine
-  Returns: str with the engine's path
-  """
-  if re.match('^https?:\/\/', template):
-    dest_dir = pjoin(engine_dir, repository_dirname(template))
-    if not os.path.exists(dest_dir):
-      srun('git clone --depth=1 {0} {1}'.format(template, dest_dir))
-    return dest_dir
-  else:
-    # check if exists
-    dest_dir = pjoin(engine_dir, template)
-    if not os.path.exists(dest_dir):
-      raise ValueError('Engine {0} does not exist in {1}'
-          .format(template, engine_dir))
-
-    return dest_dir
-
-def pio_app_list():
-  """Returns: a list of dicts for every application with the following keys:
-      `name`, `id`, `access_key`, `allowed_events`
-  """
-  output = srun_out('pio app list').rstrip()
-  return [ { 'name': line[2], 'id': int(line[4]),
-             'access_key': line[6], 'allowed_events': line[8] }
-          for line in [x.split() for x in output.split('\n')[1:-1]] ]
-
-def get_app_eventserver_url_json(test_context):
-  return 'http://{}:{}/events.json'.format(
-      test_context.es_ip, test_context.es_port)
-
-def get_engine_url_json(engine_ip, engine_port):
-  return 'http://{}:{}/queries.json'.format(
-      engine_ip, engine_port)
-
-def send_event(event, test_context, access_key, channel=None):
-  """ Sends an event to the eventserver
-  Args:
-    event: json-like dictionary describing an event
-    test_context (obj: `TestContext`):
-    access_key: application's access key
-    channel (str): custom channel for storing event
-  Returns: `requests.Response`
-  """
-  url = get_app_eventserver_url_json(test_context)
-  params = { 'accessKey': access_key }
-  if channel: params['channel'] = channel
-  return requests.post(
-      url,
-      params=params,
-      json=event)
-
-def send_events_batch(events, test_context, access_key, channel=None):
-  """ Send events in batch via REST to the eventserver
-  Args:
-    events: a list of json-like dictionaries for events
-    test_context (obj: `TestContext`):
-    access_key: application's access key
-    channel (str): custom channel for storing event
-  Returns: `requests.Response`
-  Requires: Events length must not exceed length of 50
-    http://docs.prediction.io/datacollection/eventmodel/#3.-batch-events-to-the-eventserver
-  """
-  url = 'http://{}:{}/batch/events.json'.format(
-      test_context.es_ip, test_context.es_port)
-  params = { 'accessKey': access_key }
-  if channel: params['channel'] = channel
-  return requests.post(
-      url,
-      params=params,
-      json=events)
-
-
-def import_events_batch(events, test_context, appid, channel=None):
-  """ Imports events in batch from file with `pio import`
-  Args:
-    events: a list of json-like dictionaries for events
-    test_context (obj: `TestContext`)
-    appid (int): application's id
-    channel (str): custom channel for storing event
-  """
-  # Writing events list to temporary file.
-  # `pio import` requires each line of input file to be a JSON string
-  # representing an event. Empty lines are not allowed.
-  contents = ''
-  for ev in events:
-      contents += '{}\n'.format(json.dumps(ev))
-  contents.rstrip('\n')
-
-  file_path = pjoin(test_context.data_directory, 'events.json.tmp')
-  try:
-      with open(file_path, 'w') as f:
-          f.write(contents)
-      srun('pio import --appid {} --input {} {}'.format(
-          appid,
-          file_path,
-          '--channel {}'.format(channel) if channel else ''))
-  finally:
-      os.remove(file_path)
-
-def get_events(test_context, access_key, params={}):
-  """ Gets events for some application
-  Args:
-    test_context (obj: `TestContext`)
-    access_key (str):
-    params (dict): special parameters for eventserver's GET, e.g:
-        'limit', 'reversed', 'event'. See the docs
-  Returns: `requests.Response`
-  """
-  url = get_app_eventserver_url_json(test_context)
-  return requests.get(url, params=dict({'accessKey': access_key}, **params))
-
-def query_engine(data, engine_ip='localhost', engine_port=8000):
-  """ Send a query to deployed engine
-  Args:
-    data (dict): json-like dictionary being an input to an engine
-    access_key (str):
-    engine_ip (str): ip of deployed engine
-    engine_port (int): port of deployed engine
-  Returns: `requests.Response`
-  """
-  url = get_engine_url_json(engine_ip, engine_port)
-  return requests.post(url, json=data)
-
-class AppEngine:
-  """ This is a utility class simplifying all app related interactions.
-  Basically it is just a wrapper on other utility functions and shell
-  scripts.
-  """
-
-  def __init__(self, test_context, app_context, already_created=False):
-    """ Args:
-        test_context (obj: `TestContext`)
-        app_context (obj: `AppContext`)
-        already_created (bool): True if the given app has been already added
-    """
-    self.test_context = test_context
-    self.app_context = app_context
-    self.engine_path = obtain_template(
-        self.test_context.engine_directory, app_context.template)
-    self.deployed_process = None
-    if already_created:
-      self.__init_info()
-    else:
-      self.id = None
-      self.access_key = None
-      self.description = None
-
-    if self.app_context.engine_json_path:
-      self.__copy_engine_json()
-
-  def __copy_engine_json(self):
-    to_path = pjoin(self.engine_path, 'engine.json')
-    copyfile(self.app_context.engine_json_path, to_path)
-
-  def __init_info(self):
-    info = self.show()
-    self.id = info['id']
-    self.access_key = info['access_key']
-    self.description = info['description']
-
-  def new(self, id=None, description=None, access_key=None):
-    """ Creates a new application with given parameters """
-    srun('pio app new {} {} {} {}'.format(
-        '--id {}'.format(id) if id else '',
-        '--description \"{}\"'.format(description) if description else '',
-        '--access-key {}'.format(access_key) if access_key else '',
-        self.app_context.name))
-
-    self.__init_info()
-
-
-  def show(self):
-    """ Returns: application info in dictionary with the keys:
-         `name`: str, `id`: int, `description`: str,
-         `access_key`: str, `allowed_events`: str
-    """
-    output = srun_out('pio app show {}'.format(self.app_context.name)).rstrip()
-    lines = [x.split() for x in output.split('\n')]
-    return { 'name': lines[0][3],
-             'id': int(lines[1][4]),
-             'description': lines[2][3] if len(lines[2]) >= 4 else '',
-             'access_key': lines[3][4],
-             'allowed_events': lines[3][5] }
-
-
-  # deletes this app from pio
-  def delete(self):
-    srun('pio app delete {0} --force'.format(self.app_context.name))
-
-  def build(self, sbt_extra=None, clean=False, no_asm=True):
-    srun('cd {0}; pio build {1} {2} {3}'.format(
-        self.engine_path,
-        '--sbt-extra {}'.format(sbt_extra) if sbt_extra else '',
-        '--clean' if clean else '',
-        '--no-asm' if no_asm else ''))
-
-  def train(self, batch=None, skip_sanity_check=False, stop_after_read=False,
-          stop_after_prepare=False, engine_factory=None,
-          engine_params_key=None, scratch_uri=None):
-
-    srun('cd {}; pio train {} {} {} {} {} {} {}'.format(
-        self.engine_path,
-        '--batch {}'.format(batch) if batch else '',
-        '--skip-sanity-check' if skip_sanity_check else '',
-        '--stop-after-read' if stop_after_read else '',
-        '--stop-after-prepare' if stop_after_prepare else '',
-        '--engine_factory {}'.format(engine_factory) if engine_factory else '',
-        '--engine-params-key {}'.format(engine_params_key) if engine_params_key else '',
-        '--scratch-uri {}'.format(scratch_uri) if scratch_uri else ''))
-
-  def deploy(self, wait_time=0, ip=None, port=None, engine_instance_id=None,
-          feedback=False, accesskey=None, event_server_ip=None, event_server_port=None,
-          batch=None, scratch_uri=None):
-
-    command = 'cd {}; pio deploy {} {} {} {} {} {} {} {} {}'.format(
-            self.engine_path,
-            '--ip {}'.format(ip) if ip else '',
-            '--port {}'.format(port) if port else '',
-            '--engine-instance-id {}'.format(engine_instance_id) if engine_instance_id else '',
-            '--feedback' if feedback else '',
-            '--accesskey {}'.format(accesskey) if accesskey else '',
-            '--event-server-ip {}'.format(event_server_ip) if event_server_ip else '',
-            '--event-server-port {}'.format(event_server_port) if event_server_port else '',
-            '--batch {}'.format(bach) if batch else '',
-            '--scratch-uri {}'.format(scratch_uri) if scratch_uri else '')
-
-    self.deployed_process = srun_bg(command)
-    time.sleep(wait_time)
-    if self.deployed_process.poll() is not None:
-      raise Exception('Application engine terminated')
-    self.ip = ip if ip else 'localhost'
-    self.port = port if port else 8000
-
-  def stop(self):
-    """ Kills deployed engine """
-    if self.deployed_process:
-      self.deployed_process.kill()
-
-  def new_channel(self, channel):
-    srun('pio app channel-new {0}'.format(channel))
-
-  def delete_channel(self, channel):
-    srun('pio app channel-delete {0} --force'.format(channel))
-
-  def send_event(self, event):
-    return send_event(event, self.test_context, self.access_key)
-
-  def send_events_batch(self, events):
-    return send_events_batch(events, self.test_context, self.access_key)
-
-  def import_events_batch(self, events):
-    return import_events_batch(events, self.test_context, self.id)
-
-  def get_events(self, params={}):
-    return get_events(self.test_context, self.access_key, params)
-
-  def delete_data(self, delete_all=True, channel=None):
-    srun('pio app data-delete {0} {1} {2} --force'
-        .format(
-            self.app_context.name,
-            '--all' if delete_all else '',
-            '--channel ' + channel if channel is not None else ''))
-
-  def query(self, data):
-    return query_engine(data, self.ip, self.port)