You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2016/08/09 21:43:41 UTC
[33/52] [abbrv] incubator-predictionio git commit: Change indentation
to 2 spaces and add some additional comments
Change indentation to 2 spaces and add some additional comments
Document that send_events_batch() has batch size limit of 50
Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/352eb5e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/352eb5e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/352eb5e0
Branch: refs/heads/develop
Commit: 352eb5e04c8ce5e80bee9c09fbeaee5553b00d38
Parents: 7192fbb
Author: Chan Lee <ch...@gmail.com>
Authored: Mon Aug 1 16:40:01 2016 -0700
Committer: Marcin Ziemi\u0144ski <zi...@gmail.com>
Committed: Wed Aug 3 14:42:06 2016 -0700
----------------------------------------------------------------------
globals.py | 16 +-
integration.py | 68 ++---
scenarios/basic_app_usecases.py | 270 +++++++++---------
scenarios/eventserver_test.py | 2 +-
scenarios/quickstart_test.py | 222 +++++++--------
tests.py | 74 ++---
utils.py | 526 ++++++++++++++++++-----------------
7 files changed, 590 insertions(+), 588 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/352eb5e0/globals.py
----------------------------------------------------------------------
diff --git a/globals.py b/globals.py
index 6d4abd3..1134501 100644
--- a/globals.py
+++ b/globals.py
@@ -5,13 +5,13 @@ SUPPRESS_STDERR=False
LOGGER_NAME='INT_TESTS'
def std_out():
- if SUPPRESS_STDOUT:
- return subprocess.DEVNULL
- else:
- return None
+ if SUPPRESS_STDOUT:
+ return subprocess.DEVNULL
+ else:
+ return None
def std_err():
- if SUPPRESS_STDERR:
- return subprocess.DEVNULL
- else:
- return None
+ if SUPPRESS_STDERR:
+ return subprocess.DEVNULL
+ else:
+ return None
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/352eb5e0/integration.py
----------------------------------------------------------------------
diff --git a/integration.py b/integration.py
index 6cfa267..441365e 100644
--- a/integration.py
+++ b/integration.py
@@ -3,44 +3,44 @@ import logging
import pio_tests.globals as globals
class TestContext:
- """Class representing the settings provided for every test"""
+ """Class representing the settings provided for every test"""
- def __init__(self, engine_directory, data_directory, es_ip='0.0.0.0', es_port=7070):
- """
- Args:
- engine_directory (str): path to the directory where the engines are stored
- data_directory (str): path to the directory where tests can keep their data
- es_ip (str): ip of the eventserver
- es_port (int): port of the eventserver
- """
- self.engine_directory = engine_directory
- self.data_directory = data_directory
- self.es_ip = es_ip
- self.es_port = es_port
+ def __init__(self, engine_directory, data_directory, es_ip='0.0.0.0', es_port=7070):
+ """
+ Args:
+ engine_directory (str): path to the directory where the engines are stored
+ data_directory (str): path to the directory where tests can keep their data
+ es_ip (str): ip of the eventserver
+ es_port (int): port of the eventserver
+ """
+ self.engine_directory = engine_directory
+ self.data_directory = data_directory
+ self.es_ip = es_ip
+ self.es_port = es_port
class BaseTestCase(unittest.TestCase):
- """This is the base class for all integration tests
+ """This is the base class for all integration tests
- This class sets up a `TestContext` object and a logger for every test case
- """
- def __init__(self, test_context, methodName='runTest'):
- super(BaseTestCase, self).__init__(methodName)
- self.test_context = test_context
- self.log = logging.getLogger(globals.LOGGER_NAME)
+ This class sets up a `TestContext` object and a logger for every test case
+ """
+ def __init__(self, test_context, methodName='runTest'):
+ super(BaseTestCase, self).__init__(methodName)
+ self.test_context = test_context
+ self.log = logging.getLogger(globals.LOGGER_NAME)
class AppContext:
- """ This class is a description of an instance of the engine"""
+ """ This class is a description of an instance of the engine"""
- def __init__(self, name, template, engine_json_path=None):
- """
- Args:
- name (str): application name
- template (str): either the name of an engine from the engines directory
- or a link to repository with the engine
- engine_json_path (str): path to json file describing an engine (a custom engine.json)
- to be used for the application. If `None`, engine.json from the engine's directory
- will be used
- """
- self.name = name
- self.template = template
- self.engine_json_path = engine_json_path
+ def __init__(self, name, template, engine_json_path=None):
+ """
+ Args:
+ name (str): application name
+ template (str): either the name of an engine from the engines directory
+ or a link to repository with the engine
+ engine_json_path (str): path to json file describing an engine (a custom engine.json)
+ to be used for the application. If `None`, engine.json from the engine's directory
+ will be used
+ """
+ self.name = name
+ self.template = template
+ self.engine_json_path = engine_json_path
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/352eb5e0/scenarios/basic_app_usecases.py
----------------------------------------------------------------------
diff --git a/scenarios/basic_app_usecases.py b/scenarios/basic_app_usecases.py
index 131b2e9..94d2f34 100644
--- a/scenarios/basic_app_usecases.py
+++ b/scenarios/basic_app_usecases.py
@@ -10,146 +10,146 @@ from utils import *
ITEMS_COUNT = 12
def get_buy_events(users, per_user=2):
- events = []
- for u in range(users):
- items = set([random.randint(0, ITEMS_COUNT) for i in range(per_user)])
- for item in items:
- events.append({
- "event": "buy",
- "entityType": "user",
- "entityId": u,
- "targetEntityType": "item",
- "targetEntityId": item })
-
- return events
+ events = []
+ for u in range(users):
+ items = set([random.randint(0, ITEMS_COUNT) for i in range(per_user)])
+ for item in items:
+ events.append({
+ "event": "buy",
+ "entityType": "user",
+ "entityId": u,
+ "targetEntityType": "item",
+ "targetEntityId": item })
+
+ return events
def get_rate_events(users, per_user=2):
- events = []
- for u in range(users):
- items = set([random.randint(0, ITEMS_COUNT) for i in range(per_user)])
- for item in items:
- events.append( {
- "event": "rate",
- "entityType": "user",
- "entityId": u,
- "targetEntityType": "item",
- "targetEntityId": item,
- "properties": { "rating" : float(random.randint(1,5)) } })
+ events = []
+ for u in range(users):
+ items = set([random.randint(0, ITEMS_COUNT) for i in range(per_user)])
+ for item in items:
+ events.append( {
+ "event": "rate",
+ "entityType": "user",
+ "entityId": u,
+ "targetEntityType": "item",
+ "targetEntityId": item,
+ "properties": { "rating" : float(random.randint(1,5)) } })
- return events
+ return events
class BasicAppUsecases(BaseTestCase):
- def setUp(self):
- random.seed(3)
- self.log.info("Setting up the engine")
-
- template_path = pjoin(
- self.test_context.engine_directory, "recommendation-engine")
- engine_json_path = pjoin(
- self.test_context.data_directory, "quickstart_test/engine.json")
-
- app_context = AppContext(
- name="MyRecommender",
- template=template_path,
- engine_json_path=engine_json_path)
-
- self.app = AppEngine(self.test_context, app_context)
- self.app_pid = None
-
- def runTest(self):
- self.app_creation()
- self.check_app_list()
- self.check_data()
- self.check_build()
- self.check_train_and_deploy()
-
- def app_creation(self):
- self.log.info("Adding a new application")
- description = "SomeDescription"
- self.app.new(description=description)
- self.assertEqual(description, self.app.description)
-
- self.log.info("Creating an app again - should fail")
- self.assertRaises(CalledProcessError, lambda : self.app.new())
-
- def check_app_list(self):
- self.log.info("Checking if app is on the list")
- apps = pio_app_list()
- self.assertEqual(1,
- len([a for a in apps if a['name'] == self.app.app_context.name]))
-
- def check_data(self):
- self.log.info("Importing events")
- buy_events = get_buy_events(20, 1)
- rate_events = get_rate_events(20, 1)
-
- for ev in buy_events + rate_events:
- self.assertEquals(201, self.app.send_event(ev).status_code)
-
- self.log.info("Checking imported events")
- r = self.app.get_events(params={'limit': -1})
- self.assertEqual(200, r.status_code)
- self.assertEqual(len(buy_events) + len(rate_events), len(r.json()))
-
- self.log.info("Deleting entire data")
- self.app.delete_data()
- self.log.info("Checking if there are no events at all")
- r = self.app.get_events(params={'limit': -1})
- self.assertEqual(404, r.status_code)
-
- def check_build(self):
- self.log.info("Clean build")
- self.app.build(clean=True)
- self.log.info("Second build")
- self.app.build()
-
- def check_train_and_deploy(self):
- self.log.info("import some data first")
- buy_events = get_buy_events(20, 5)
- rate_events = get_rate_events(20, 5)
- for ev in buy_events + rate_events:
- self.assertEquals(201, self.app.send_event(ev).status_code)
-
- self.log.info("Training")
- self.app.train()
- self.log.info("Deploying")
- self.app.deploy()
- self.assertFalse(self.app.deployed_process.poll())
-
- self.log.info("Importing more events")
- buy_events = get_buy_events(60, 5)
- rate_events = get_rate_events(60, 5)
- for ev in buy_events + rate_events:
- self.assertEquals(201, self.app.send_event(ev).status_code)
-
- self.log.info("Training again")
- self.app.train()
-
- time.sleep(7)
-
- self.log.info("Check serving")
- r = self.app.query({"user": 1, "num": 5})
- self.assertEqual(200, r.status_code)
- result = r.json()
- self.assertEqual(5, len(result['itemScores']))
- r = self.app.query({"user": 5, "num": 3})
- self.assertEqual(200, r.status_code)
- result = r.json()
- self.assertEqual(3, len(result['itemScores']))
-
- self.log.info("Remove data")
- self.app.delete_data()
- self.log.info("Retraining should fail")
- self.assertRaises(CalledProcessError, lambda: self.app.train())
-
-
- def tearDown(self):
- self.log.info("Stopping deployed engine")
- self.app.stop()
- self.log.info("Deleting all related data")
- self.app.delete_data()
- self.log.info("Removing an app")
- self.app.delete()
+ def setUp(self):
+ random.seed(3)
+ self.log.info("Setting up the engine")
+
+ template_path = pjoin(
+ self.test_context.engine_directory, "recommendation-engine")
+ engine_json_path = pjoin(
+ self.test_context.data_directory, "quickstart_test/engine.json")
+
+ app_context = AppContext(
+ name="MyRecommender",
+ template=template_path,
+ engine_json_path=engine_json_path)
+
+ self.app = AppEngine(self.test_context, app_context)
+ self.app_pid = None
+
+ def runTest(self):
+ self.app_creation()
+ self.check_app_list()
+ self.check_data()
+ self.check_build()
+ self.check_train_and_deploy()
+
+ def app_creation(self):
+ self.log.info("Adding a new application")
+ description = "SomeDescription"
+ self.app.new(description=description)
+ self.assertEqual(description, self.app.description)
+
+ self.log.info("Creating an app again - should fail")
+ self.assertRaises(CalledProcessError, lambda : self.app.new())
+
+ def check_app_list(self):
+ self.log.info("Checking if app is on the list")
+ apps = pio_app_list()
+ self.assertEqual(1,
+ len([a for a in apps if a['name'] == self.app.app_context.name]))
+
+ def check_data(self):
+ self.log.info("Importing events")
+ buy_events = get_buy_events(20, 1)
+ rate_events = get_rate_events(20, 1)
+
+ for ev in buy_events + rate_events:
+ self.assertEquals(201, self.app.send_event(ev).status_code)
+
+ self.log.info("Checking imported events")
+ r = self.app.get_events(params={'limit': -1})
+ self.assertEqual(200, r.status_code)
+ self.assertEqual(len(buy_events) + len(rate_events), len(r.json()))
+
+ self.log.info("Deleting entire data")
+ self.app.delete_data()
+ self.log.info("Checking if there are no events at all")
+ r = self.app.get_events(params={'limit': -1})
+ self.assertEqual(404, r.status_code)
+
+ def check_build(self):
+ self.log.info("Clean build")
+ self.app.build(clean=True)
+ self.log.info("Second build")
+ self.app.build()
+
+ def check_train_and_deploy(self):
+ self.log.info("import some data first")
+ buy_events = get_buy_events(20, 5)
+ rate_events = get_rate_events(20, 5)
+ for ev in buy_events + rate_events:
+ self.assertEquals(201, self.app.send_event(ev).status_code)
+
+ self.log.info("Training")
+ self.app.train()
+ self.log.info("Deploying")
+ self.app.deploy()
+ self.assertFalse(self.app.deployed_process.poll())
+
+ self.log.info("Importing more events")
+ buy_events = get_buy_events(60, 5)
+ rate_events = get_rate_events(60, 5)
+ for ev in buy_events + rate_events:
+ self.assertEquals(201, self.app.send_event(ev).status_code)
+
+ self.log.info("Training again")
+ self.app.train()
+
+ time.sleep(7)
+
+ self.log.info("Check serving")
+ r = self.app.query({"user": 1, "num": 5})
+ self.assertEqual(200, r.status_code)
+ result = r.json()
+ self.assertEqual(5, len(result['itemScores']))
+ r = self.app.query({"user": 5, "num": 3})
+ self.assertEqual(200, r.status_code)
+ result = r.json()
+ self.assertEqual(3, len(result['itemScores']))
+
+ self.log.info("Remove data")
+ self.app.delete_data()
+ self.log.info("Retraining should fail")
+ self.assertRaises(CalledProcessError, lambda: self.app.train())
+
+
+ def tearDown(self):
+ self.log.info("Stopping deployed engine")
+ self.app.stop()
+ self.log.info("Deleting all related data")
+ self.app.delete_data()
+ self.log.info("Removing an app")
+ self.app.delete()
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/352eb5e0/scenarios/eventserver_test.py
----------------------------------------------------------------------
diff --git a/scenarios/eventserver_test.py b/scenarios/eventserver_test.py
index 4675299..2cd3d87 100644
--- a/scenarios/eventserver_test.py
+++ b/scenarios/eventserver_test.py
@@ -74,7 +74,7 @@ class EventserverTest(BaseTestCase):
self.load_events("signup_events_51.json"))
self.assertEqual(400, r.status_code)
- self.log.info("Importing batch of events")
+ self.log.info("Importing events from file does not have batch size limit")
self.app.import_events_batch(
self.load_events("signup_events_51.json"))
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/352eb5e0/scenarios/quickstart_test.py
----------------------------------------------------------------------
diff --git a/scenarios/quickstart_test.py b/scenarios/quickstart_test.py
index 8691f21..080c26a 100644
--- a/scenarios/quickstart_test.py
+++ b/scenarios/quickstart_test.py
@@ -6,120 +6,120 @@ from pio_tests.integration import BaseTestCase, AppContext
from utils import AppEngine, srun, pjoin
def read_events(file_path):
- RATE_ACTIONS_DELIMITER = "::"
- with open(file_path, 'r') as f:
- events = []
- for line in f:
- data = line.rstrip('\r\n').split(RATE_ACTIONS_DELIMITER)
- if random.randint(0, 1) == 1:
- events.append( {
- "event": "rate",
- "entityType": "user",
- "entityId": data[0],
- "targetEntityType": "item",
- "targetEntityId": data[1],
- "properties": { "rating" : float(data[2]) } })
- else:
- events.append({
- "event": "buy",
- "entityType": "user",
- "entityId": data[0],
- "targetEntityType": "item",
- "targetEntityId": data[1] })
-
- return events
+ RATE_ACTIONS_DELIMITER = "::"
+ with open(file_path, 'r') as f:
+ events = []
+ for line in f:
+ data = line.rstrip('\r\n').split(RATE_ACTIONS_DELIMITER)
+ if random.randint(0, 1) == 1:
+ events.append( {
+ "event": "rate",
+ "entityType": "user",
+ "entityId": data[0],
+ "targetEntityType": "item",
+ "targetEntityId": data[1],
+ "properties": { "rating" : float(data[2]) } })
+ else:
+ events.append({
+ "event": "buy",
+ "entityType": "user",
+ "entityId": data[0],
+ "targetEntityType": "item",
+ "targetEntityId": data[1] })
+
+ return events
class QuickStartTest(BaseTestCase):
- def setUp(self):
- self.log.info("Setting up the engine")
-
- template_path = pjoin(
- self.test_context.engine_directory, "recommendation-engine")
- engine_json_path = pjoin(
- self.test_context.data_directory, "quickstart_test/engine.json")
-
- self.training_data_path = pjoin(
- self.test_context.data_directory,
- "quickstart_test/training_data.txt")
-
- # downloading training data
- srun('curl https://raw.githubusercontent.com/apache/spark/master/' \
- 'data/mllib/sample_movielens_data.txt --create-dirs -o {}'
- .format(self.training_data_path))
-
- app_context = AppContext(
- name="MyRecommender",
- template=template_path,
- engine_json_path=engine_json_path)
-
- self.app = AppEngine(self.test_context, app_context)
-
- def runTest(self):
- self.log.info("Adding a new application")
- self.app.new()
-
- event1 = {
- "event" : "rate",
- "entityType" : "user",
- "entityId" : "u0",
- "targetEntityType" : "item",
- "targetEntityId" : "i0",
- "properties" : {
- "rating" : 5
- },
- "eventTime" : "2014-11-02T09:39:45.618-08:00" }
-
- event2 = {
- "event" : "buy",
- "entityType" : "user",
- "entityId" : "u1",
- "targetEntityType" : "item",
- "targetEntityId" : "i2",
- "eventTime" : "2014-11-10T12:34:56.123-08:00" }
-
- self.log.info("Sending two test events")
- self.assertListEqual(
- [201, 201],
- [self.app.send_event(e).status_code for e in [event1, event2]])
-
- self.log.info("Checking the number of events stored on the server")
- r = self.app.get_events()
- self.assertEquals(200, r.status_code)
- stored_events = r.json()
- self.assertEqual(2, len(stored_events))
-
- self.log.info("Importing many events")
- new_events = read_events(self.training_data_path)
- for ev in new_events:
- r = self.app.send_event(ev)
- self.assertEqual(201, r.status_code)
-
- self.log.info("Checking the number of events stored on the server after the update")
- r = self.app.get_events(params={'limit': -1})
- self.assertEquals(200, r.status_code)
- stored_events = r.json()
- self.assertEquals(len(new_events) + 2, len(stored_events))
-
- self.log.info("Building an engine...")
- self.app.build()
- self.log.info("Training...")
- self.app.train()
- self.log.info("Deploying and waiting 15s for it to start...")
- self.app.deploy(wait_time=15)
-
- self.log.info("Sending a single query and checking results")
- user_query = { "user": 1, "num": 4 }
- r = self.app.query(user_query)
- self.assertEqual(200, r.status_code)
- result = r.json()
- self.assertEqual(4, len(result['itemScores']))
+ def setUp(self):
+ self.log.info("Setting up the engine")
+
+ template_path = pjoin(
+ self.test_context.engine_directory, "recommendation-engine")
+ engine_json_path = pjoin(
+ self.test_context.data_directory, "quickstart_test/engine.json")
+
+ self.training_data_path = pjoin(
+ self.test_context.data_directory,
+ "quickstart_test/training_data.txt")
+
+ # downloading training data
+ srun('curl https://raw.githubusercontent.com/apache/spark/master/' \
+ 'data/mllib/sample_movielens_data.txt --create-dirs -o {}'
+ .format(self.training_data_path))
+
+ app_context = AppContext(
+ name="MyRecommender",
+ template=template_path,
+ engine_json_path=engine_json_path)
+
+ self.app = AppEngine(self.test_context, app_context)
+
+ def runTest(self):
+ self.log.info("Adding a new application")
+ self.app.new()
+
+ event1 = {
+ "event" : "rate",
+ "entityType" : "user",
+ "entityId" : "u0",
+ "targetEntityType" : "item",
+ "targetEntityId" : "i0",
+ "properties" : {
+ "rating" : 5
+ },
+ "eventTime" : "2014-11-02T09:39:45.618-08:00" }
+
+ event2 = {
+ "event" : "buy",
+ "entityType" : "user",
+ "entityId" : "u1",
+ "targetEntityType" : "item",
+ "targetEntityId" : "i2",
+ "eventTime" : "2014-11-10T12:34:56.123-08:00" }
+
+ self.log.info("Sending two test events")
+ self.assertListEqual(
+ [201, 201],
+ [self.app.send_event(e).status_code for e in [event1, event2]])
+
+ self.log.info("Checking the number of events stored on the server")
+ r = self.app.get_events()
+ self.assertEquals(200, r.status_code)
+ stored_events = r.json()
+ self.assertEqual(2, len(stored_events))
+
+ self.log.info("Importing many events")
+ new_events = read_events(self.training_data_path)
+ for ev in new_events:
+ r = self.app.send_event(ev)
+ self.assertEqual(201, r.status_code)
+
+ self.log.info("Checking the number of events stored on the server after the update")
+ r = self.app.get_events(params={'limit': -1})
+ self.assertEquals(200, r.status_code)
+ stored_events = r.json()
+ self.assertEquals(len(new_events) + 2, len(stored_events))
+
+ self.log.info("Building an engine...")
+ self.app.build()
+ self.log.info("Training...")
+ self.app.train()
+ self.log.info("Deploying and waiting 15s for it to start...")
+ self.app.deploy(wait_time=15)
+
+ self.log.info("Sending a single query and checking results")
+ user_query = { "user": 1, "num": 4 }
+ r = self.app.query(user_query)
+ self.assertEqual(200, r.status_code)
+ result = r.json()
+ self.assertEqual(4, len(result['itemScores']))
def tearDown(self):
- self.log.info("Stopping deployed engine")
- self.app.stop()
- self.log.info("Deleting all related data")
- self.app.delete_data()
- self.log.info("Removing an app")
- self.app.delete()
+ self.log.info("Stopping deployed engine")
+ self.app.stop()
+ self.log.info("Deleting all related data")
+ self.app.delete_data()
+ self.log.info("Removing an app")
+ self.app.delete()
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/352eb5e0/tests.py
----------------------------------------------------------------------
diff --git a/tests.py b/tests.py
index 0a95593..c84bb08 100755
--- a/tests.py
+++ b/tests.py
@@ -16,13 +16,13 @@ parser = argparse.ArgumentParser(description='Integration tests for PredictionIO
parser.add_argument('--eventserver-ip', default='0.0.0.0')
parser.add_argument('--eventserver-port', type=int, default=7070)
parser.add_argument('--no-shell-stdout', action='store_true',
- help='Suppress STDOUT output from shell executed commands')
+ help='Suppress STDOUT output from shell executed commands')
parser.add_argument('--no-shell-stderr', action='store_true',
- help='Suppress STDERR output from shell executed commands')
+ help='Suppress STDERR output from shell executed commands')
parser.add_argument('--logging', action='store', choices=['INFO', 'DEBUG', 'NO_LOGGING'],
- default='NO_LOGGING', help='Choose the logging level')
+ default='NO_LOGGING', help='Choose the logging level')
parser.add_argument('--tests', nargs='*', type=str,
- default=None, help='Names of the tests to execute. By default all tests will be checked')
+ default=None, help='Names of the tests to execute. By default all tests will be checked')
TESTS_DIRECTORY = os.path.abspath(os.path.dirname(__file__))
ENGINE_DIRECTORY = os.path.join(TESTS_DIRECTORY, "engines")
@@ -32,44 +32,44 @@ LOGGING_FORMAT = '[%(levelname)s] %(module)s %(asctime)-15s: %(message)s'
logging.basicConfig(format=LOGGING_FORMAT)
def get_tests(test_context):
- # ========= ADD TESTS HERE!!! ================================
- return {'QuickStart': QuickStartTest(test_context),
- 'BasicAppUsecases': BasicAppUsecases(test_context),
- 'EventserverTest': EventserverTest(test_context)}
+ # ========= ADD TESTS HERE!!! ================================
+ return {'QuickStart': QuickStartTest(test_context),
+ 'BasicAppUsecases': BasicAppUsecases(test_context),
+ 'EventserverTest': EventserverTest(test_context)}
if __name__ == "__main__":
- args = vars(parser.parse_args())
+ args = vars(parser.parse_args())
- if args.get('no_shell_stdout'):
- globals.SUPPRESS_STDOUT = True
- if args.get('no_shell_stderr'):
- globals.SUPPRESS_STDERR = True
+ if args.get('no_shell_stdout'):
+ globals.SUPPRESS_STDOUT = True
+ if args.get('no_shell_stderr'):
+ globals.SUPPRESS_STDERR = True
- # setting up logging
- log_opt = args['logging']
- logger = logging.getLogger(globals.LOGGER_NAME)
- if log_opt == 'INFO':
- logger.level = logging.INFO
- elif log_opt == 'DEBUG':
- logger.level = logging.DEBUG
+ # setting up logging
+ log_opt = args['logging']
+ logger = logging.getLogger(globals.LOGGER_NAME)
+ if log_opt == 'INFO':
+ logger.level = logging.INFO
+ elif log_opt == 'DEBUG':
+ logger.level = logging.DEBUG
- test_context = TestContext(
- ENGINE_DIRECTORY, DATA_DIRECTORY, args['eventserver_ip'], int(args['eventserver_port']))
+ test_context = TestContext(
+ ENGINE_DIRECTORY, DATA_DIRECTORY, args['eventserver_ip'], int(args['eventserver_port']))
- tests_dict = get_tests(test_context)
- test_names = args['tests']
- tests = []
- if test_names is not None:
- tests = [t for name, t in tests_dict.items() if name in test_names]
- else:
- tests = tests_dict.values()
+ tests_dict = get_tests(test_context)
+ test_names = args['tests']
+ tests = []
+ if test_names is not None:
+ tests = [t for name, t in tests_dict.items() if name in test_names]
+ else:
+ tests = tests_dict.values()
- # Actual tests execution
- event_server_process = srun_bg('pio eventserver --ip {} --port {}'
- .format(test_context.es_ip, test_context.es_port))
- time.sleep(5)
- result = xmlrunner.XMLTestRunner(verbosity=2, output='test-reports').run(unittest.TestSuite(tests))
- event_server_process.kill()
+ # Actual tests execution
+ event_server_process = srun_bg('pio eventserver --ip {} --port {}'
+ .format(test_context.es_ip, test_context.es_port))
+ time.sleep(5)
+ result = xmlrunner.XMLTestRunner(verbosity=2, output='test-reports').run(unittest.TestSuite(tests))
+ event_server_process.kill()
- if not result.wasSuccessful():
- sys.exit(1)
+ if not result.wasSuccessful():
+ sys.exit(1)
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/352eb5e0/utils.py
----------------------------------------------------------------------
diff --git a/utils.py b/utils.py
index d837984..8066c4b 100644
--- a/utils.py
+++ b/utils.py
@@ -9,294 +9,296 @@ from os.path import join as pjoin
import pio_tests.globals as globals
def srun(command):
- """ Runs a shell command given as a `str`
- Raises: `subprocess.CalledProcessError` when exit code != 0
- """
- return run(command, shell=True, stdout=globals.std_out(),
- stderr=globals.std_err(), check=True)
+ """ Runs a shell command given as a `str`
+ Raises: `subprocess.CalledProcessError` when exit code != 0
+ """
+ return run(command, shell=True, stdout=globals.std_out(),
+ stderr=globals.std_err(), check=True)
def srun_out(command):
- """ Runs a shell command given as a `str`
- Returns: string with command's output
- Raises: `subprocess.CalledProcessError` when exit code != 0
- """
- return check_output(command, shell=True, universal_newlines=True,
- stderr=globals.std_err())
+ """ Runs a shell command given as a `str`
+ Returns: string with command's output
+ Raises: `subprocess.CalledProcessError` when exit code != 0
+ """
+ return check_output(command, shell=True, universal_newlines=True,
+ stderr=globals.std_err())
def srun_bg(command):
- """ Runs a shell command given as a `str` in the background
- Returns: (obj: `subprocess.Popen`) for executed process
- """
- return Popen(command, shell=True, stdout=globals.std_out(),
- stderr=globals.std_err())
+ """ Runs a shell command given as a `str` in the background
+ Returns: (obj: `subprocess.Popen`) for executed process
+ """
+ return Popen(command, shell=True, stdout=globals.std_out(),
+ stderr=globals.std_err())
def repository_dirname(template):
- """ Utility function getting repository name from the link
- Example: for "https://github.com/user/SomeRepo" should return "SomeRepo"
- """
- return template.split('/')[-1]
+ """ Utility function getting repository name from the link
+ Example: for "https://github.com/user/SomeRepo" should return "SomeRepo"
+ """
+ return template.split('/')[-1]
def obtain_template(engine_dir, template):
- """Given a directory with engines and a template downloads an engine
- if neccessary
- Args:
- engine_dir (str): directory where engines are stored
- template (str): either the name of an engine from the engines directory
- or a link to repository with the engine
- Returns: str: with the engine's path
- """
- if re.match('^https?:\/\/', template):
- dest_dir = pjoin(engine_dir, repository_dirname(template))
- if not os.path.exists(dest_dir):
- srun('git clone --depth=1 {0} {1}'.format(template, dest_dir))
- return dest_dir
- else:
- # check if exists
- dest_dir = pjoin(engine_dir, template)
- if not os.path.exists(dest_dir):
- raise ValueError('Engine {0} does not exist in {1}'
- .format(template, engine_dir))
-
- return dest_dir
+ """Given a directory with engines and a template downloads an engine
+ if neccessary
+ Args:
+ engine_dir (str): directory where engines are stored
+ template (str): either the name of an engine from the engines directory
+ or a link to repository with the engine
+ Returns: str with the engine's path
+ """
+ if re.match('^https?:\/\/', template):
+ dest_dir = pjoin(engine_dir, repository_dirname(template))
+ if not os.path.exists(dest_dir):
+ srun('git clone --depth=1 {0} {1}'.format(template, dest_dir))
+ return dest_dir
+ else:
+ # check if exists
+ dest_dir = pjoin(engine_dir, template)
+ if not os.path.exists(dest_dir):
+ raise ValueError('Engine {0} does not exist in {1}'
+ .format(template, engine_dir))
+
+ return dest_dir
def pio_app_list():
- """Returns: a list of dicts for every application with the following keys:
- `name`, `id`, `access_key`, `allowed_events`
- """
- output = srun_out('pio app list').rstrip()
- return [ { 'name': line[2], 'id': int(line[4]),
- 'access_key': line[6], 'allowed_events': line[8] }
- for line in [x.split() for x in output.split('\n')[1:-1]] ]
+ """Returns: a list of dicts for every application with the following keys:
+ `name`, `id`, `access_key`, `allowed_events`
+ """
+ output = srun_out('pio app list').rstrip()
+ return [ { 'name': line[2], 'id': int(line[4]),
+ 'access_key': line[6], 'allowed_events': line[8] }
+ for line in [x.split() for x in output.split('\n')[1:-1]] ]
def get_app_eventserver_url_json(test_context):
- return 'http://{}:{}/events.json'.format(
- test_context.es_ip, test_context.es_port)
+ return 'http://{}:{}/events.json'.format(
+ test_context.es_ip, test_context.es_port)
def get_engine_url_json(engine_ip, engine_port):
- return 'http://{}:{}/queries.json'.format(
- engine_ip, engine_port)
+ return 'http://{}:{}/queries.json'.format(
+ engine_ip, engine_port)
def send_event(event, test_context, access_key):
- """ Sends an event to the eventserver
- Args:
- event: json-like dictionary describing an event
- test_context (obj: `TestContext`):
- access_key: applications access key
- Returns: `requests.Response`
- """
- # TODO: Add channel param
- url = get_app_eventserver_url_json(test_context)
- return requests.post(
- url,
- params={'accessKey': access_key},
- json=event)
+ """ Sends an event to the eventserver
+ Args:
+ event: json-like dictionary describing an event
+ test_context (obj: `TestContext`):
+ access_key: application's access key
+ Returns: `requests.Response`
+ """
+ # TODO: Add channel param
+ url = get_app_eventserver_url_json(test_context)
+ return requests.post(
+ url,
+ params={'accessKey': access_key},
+ json=event)
def send_events_batch(events, test_context, access_key):
- """ Send events in batch to the eventserver
- Args:
- events: a list of json-like dictionaries for events
- test_context (obj: `TestContext`):
- access_key: applications access key
- Returns: `requests.Response`
- """
- url = 'http://{}:{}/batch/events.json'.format(
- test_context.es_ip, test_context.es_port)
- return requests.post(
- url,
- params={'accessKey': access_key},
- json=events)
+ """ Send events in batch via REST to the eventserver
+ Args:
+ events: a list of json-like dictionaries for events
+ test_context (obj: `TestContext`):
+ access_key: application's access key
+ Returns: `requests.Response`
+ Requires: Events length must not exceed length of 50
+ http://docs.prediction.io/datacollection/eventmodel/#3.-batch-events-to-the-eventserver
+ """
+ url = 'http://{}:{}/batch/events.json'.format(
+ test_context.es_ip, test_context.es_port)
+ return requests.post(
+ url,
+ params={'accessKey': access_key},
+ json=events)
def import_events_batch(events, test_context, appid, channel=None):
- """ Imports events in batch from file with `pio import`
- Args:
- events: a list of json-like dictionaries for events
- test_context (obj: `TestContext`)
- appid (int): application's id
- channel (str):
- """
- # Writing events list to temporary file.
- # `pio import` requires each line of input file to be a JSON string
- # representing an event. Also, empty lines are not allowed.
- contents = ''
- for ev in events:
- contents += '{}\n'.format(json.dumps(ev))
- contents.rstrip('\n')
-
- file_path = pjoin(test_context.data_directory, 'events.json.tmp')
- try:
- with open(file_path, 'w') as f:
- f.write(contents)
- srun('pio import --appid {} --input {} {}'.format(
- appid,
- file_path,
- '--channel {}'.format(channel) if channel else ''))
- finally:
- os.remove(file_path)
+ """ Imports events in batch from file with `pio import`
+ Args:
+ events: a list of json-like dictionaries for events
+ test_context (obj: `TestContext`)
+ appid (int): application's id
+ channel (str):
+ """
+ # Writing events list to temporary file.
+ # `pio import` requires each line of input file to be a JSON string
+ # representing an event. Empty lines are not allowed.
+ contents = ''
+ for ev in events:
+ contents += '{}\n'.format(json.dumps(ev))
+ contents.rstrip('\n')
+
+ file_path = pjoin(test_context.data_directory, 'events.json.tmp')
+ try:
+ with open(file_path, 'w') as f:
+ f.write(contents)
+ srun('pio import --appid {} --input {} {}'.format(
+ appid,
+ file_path,
+ '--channel {}'.format(channel) if channel else ''))
+ finally:
+ os.remove(file_path)
def get_events(test_context, access_key, params={}):
- """ Gets events for some application
- Args:
- test_context (obj: `TestContext`)
- access_key (str):
- params (dict): special parameters for eventserver's GET, e.g:
- 'limit', 'reversed', 'event'. See the docs
- Returns: `requests.Response`
- """
- url = get_app_eventserver_url_json(test_context)
- return requests.get(url, params=dict({'accessKey': access_key}, **params))
+ """ Gets events for some application
+ Args:
+ test_context (obj: `TestContext`)
+ access_key (str):
+ params (dict): special parameters for eventserver's GET, e.g:
+ 'limit', 'reversed', 'event'. See the docs
+ Returns: `requests.Response`
+ """
+ url = get_app_eventserver_url_json(test_context)
+ return requests.get(url, params=dict({'accessKey': access_key}, **params))
def query_engine(data, engine_ip='localhost', engine_port=8000):
- """ Send a query to deployed engine
- Args:
- data (dict): json-like dictionary being an input to an engine
- access_key (str):
- engine_ip (str): ip of deployed engine
- engine_port (int): port of deployed engine
- Returns: `requests.Response`
- """
- url = get_engine_url_json(engine_ip, engine_port)
- return requests.post(url, json=data)
+ """ Send a query to deployed engine
+ Args:
+ data (dict): json-like dictionary being an input to an engine
+ access_key (str):
+ engine_ip (str): ip of deployed engine
+ engine_port (int): port of deployed engine
+ Returns: `requests.Response`
+ """
+ url = get_engine_url_json(engine_ip, engine_port)
+ return requests.post(url, json=data)
class AppEngine:
- """ This is a utility class simplifying all app related interactions.
- Basically it is just a wrapper on other utility functions and shell
- scripts.
- """
-
- def __init__(self, test_context, app_context, already_created=False):
- """ Args:
- test_context (obj: `TestContext`)
- app_context (obj: `AppContext`)
- already_created (bool): True if the given app has been already added
- """
- self.test_context = test_context
- self.app_context = app_context
- self.engine_path = obtain_template(
- self.test_context.engine_directory, app_context.template)
- self.deployed_process = None
- if already_created:
- self.__init_info()
- else:
- self.id = None
- self.access_key = None
- self.description = None
-
- if self.app_context.engine_json_path:
- self.__copy_engine_json()
-
- def __copy_engine_json(self):
- to_path = pjoin(self.engine_path, 'engine.json')
- copyfile(self.app_context.engine_json_path, to_path)
-
- def __init_info(self):
- info = self.show()
- self.id = info['id']
- self.access_key = info['access_key']
- self.description = info['description']
-
- def new(self, id=None, description=None, access_key=None):
- """ Creates a new application with given parameters """
- srun('pio app new {} {} {} {}'.format(
- '--id {}'.format(id) if id else '',
- '--description \"{}\"'.format(description) if description else '',
- '--access-key {}'.format(access_key) if access_key else '',
- self.app_context.name))
-
- self.__init_info()
-
-
- def show(self):
- """ Returns: application info in dictionary with the keys:
- `name`: str, `id`: int, `description`: str,
- `access_key`: str, `allowed_events`: str
- """
- output = srun_out('pio app show {}'.format(self.app_context.name)).rstrip()
- lines = [x.split() for x in output.split('\n')]
- return { 'name': lines[0][3],
- 'id': int(lines[1][4]),
- 'description': lines[2][3] if len(lines[2]) >= 4 else '',
- 'access_key': lines[3][4],
- 'allowed_events': lines[3][5] }
-
-
- # deletes this app from pio
- def delete(self):
- srun('pio app delete {0} --force'.format(self.app_context.name))
-
- def build(self, sbt_extra=None, clean=False, no_asm=True):
- srun('cd {0}; pio build {1} {2} {3}'.format(
- self.engine_path,
- '--sbt-extra {}'.format(sbt_extra) if sbt_extra else '',
- '--clean' if clean else '',
- '--no-asm' if no_asm else ''))
+ """ This is a utility class simplifying all app related interactions.
+ Basically it is just a wrapper on other utility functions and shell
+ scripts.
+ """
- def train(self, batch=None, skip_sanity_check=False, stop_after_read=False,
- stop_after_prepare=False, engine_factory=None,
- engine_params_key=None, scratch_uri=None):
-
- srun('cd {}; pio train {} {} {} {} {} {} {}'.format(
+ def __init__(self, test_context, app_context, already_created=False):
+ """ Args:
+ test_context (obj: `TestContext`)
+ app_context (obj: `AppContext`)
+ already_created (bool): True if the given app has been already added
+ """
+ self.test_context = test_context
+ self.app_context = app_context
+ self.engine_path = obtain_template(
+ self.test_context.engine_directory, app_context.template)
+ self.deployed_process = None
+ if already_created:
+ self.__init_info()
+ else:
+ self.id = None
+ self.access_key = None
+ self.description = None
+
+ if self.app_context.engine_json_path:
+ self.__copy_engine_json()
+
+ def __copy_engine_json(self):
+ to_path = pjoin(self.engine_path, 'engine.json')
+ copyfile(self.app_context.engine_json_path, to_path)
+
+ def __init_info(self):
+ info = self.show()
+ self.id = info['id']
+ self.access_key = info['access_key']
+ self.description = info['description']
+
+ def new(self, id=None, description=None, access_key=None):
+ """ Creates a new application with given parameters """
+ srun('pio app new {} {} {} {}'.format(
+ '--id {}'.format(id) if id else '',
+ '--description \"{}\"'.format(description) if description else '',
+ '--access-key {}'.format(access_key) if access_key else '',
+ self.app_context.name))
+
+ self.__init_info()
+
+
+ def show(self):
+ """ Returns: application info in dictionary with the keys:
+ `name`: str, `id`: int, `description`: str,
+ `access_key`: str, `allowed_events`: str
+ """
+ output = srun_out('pio app show {}'.format(self.app_context.name)).rstrip()
+ lines = [x.split() for x in output.split('\n')]
+ return { 'name': lines[0][3],
+ 'id': int(lines[1][4]),
+ 'description': lines[2][3] if len(lines[2]) >= 4 else '',
+ 'access_key': lines[3][4],
+ 'allowed_events': lines[3][5] }
+
+
+ # deletes this app from pio
+ def delete(self):
+ srun('pio app delete {0} --force'.format(self.app_context.name))
+
+ def build(self, sbt_extra=None, clean=False, no_asm=True):
+ srun('cd {0}; pio build {1} {2} {3}'.format(
+ self.engine_path,
+ '--sbt-extra {}'.format(sbt_extra) if sbt_extra else '',
+ '--clean' if clean else '',
+ '--no-asm' if no_asm else ''))
+
+ def train(self, batch=None, skip_sanity_check=False, stop_after_read=False,
+ stop_after_prepare=False, engine_factory=None,
+ engine_params_key=None, scratch_uri=None):
+
+ srun('cd {}; pio train {} {} {} {} {} {} {}'.format(
+ self.engine_path,
+ '--batch {}'.format(batch) if batch else '',
+ '--skip-sanity-check' if skip_sanity_check else '',
+ '--stop-after-read' if stop_after_read else '',
+ '--stop-after-prepare' if stop_after_prepare else '',
+ '--engine_factory {}'.format(engine_factory) if engine_factory else '',
+ '--engine-params-key {}'.format(engine_params_key) if engine_params_key else '',
+ '--scratch-uri {}'.format(scratch_uri) if scratch_uri else ''))
+
+ def deploy(self, wait_time=0, ip=None, port=None, engine_instance_id=None,
+ feedback=False, accesskey=None, event_server_ip=None, event_server_port=None,
+ batch=None, scratch_uri=None):
+
+ command = 'cd {}; pio deploy {} {} {} {} {} {} {} {} {}'.format(
self.engine_path,
- '--batch {}'.format(batch) if batch else '',
- '--skip-sanity-check' if skip_sanity_check else '',
- '--stop-after-read' if stop_after_read else '',
- '--stop-after-prepare' if stop_after_prepare else '',
- '--engine_factory {}'.format(engine_factory) if engine_factory else '',
- '--engine-params-key {}'.format(engine_params_key) if engine_params_key else '',
- '--scratch-uri {}'.format(scratch_uri) if scratch_uri else ''))
-
- def deploy(self, wait_time=0, ip=None, port=None, engine_instance_id=None,
- feedback=False, accesskey=None, event_server_ip=None, event_server_port=None,
- batch=None, scratch_uri=None):
-
- command = 'cd {}; pio deploy {} {} {} {} {} {} {} {} {}'.format(
- self.engine_path,
- '--ip {}'.format(ip) if ip else '',
- '--port {}'.format(port) if port else '',
- '--engine-instance-id {}'.format(engine_instance_id) if engine_instance_id else '',
- '--feedback' if feedback else '',
- '--accesskey {}'.format(accesskey) if accesskey else '',
- '--event-server-ip {}'.format(event_server_ip) if event_server_ip else '',
- '--event-server-port {}'.format(event_server_port) if event_server_port else '',
- '--batch {}'.format(bach) if batch else '',
- '--scratch-uri {}'.format(scratch_uri) if scratch_uri else '')
-
- self.deployed_process = srun_bg(command)
- time.sleep(wait_time)
- if self.deployed_process.poll() is not None:
- raise Exception('Application engine terminated')
- self.ip = ip if ip else 'localhost'
- self.port = port if port else 8000
-
- def stop(self):
- """ Kills deployed engine """
- if self.deployed_process:
- self.deployed_process.kill()
-
- def new_channel(self, channel):
- srun('pio app channel-new {0}'.format(channel))
-
- def delete_channel(self, channel):
- srun('pio app channel-delete {0} --force'.format(channel))
-
- def send_event(self, event):
- return send_event(event, self.test_context, self.access_key)
-
- def send_events_batch(self, events):
- return send_events_batch(events, self.test_context, self.access_key)
-
- def import_events_batch(self, events):
- return import_events_batch(events, self.test_context, self.id)
-
- def get_events(self, params={}):
- return get_events(self.test_context, self.access_key, params)
-
- def delete_data(self, delete_all=True, channel=None):
- srun('pio app data-delete {0} {1} {2} --force'
- .format(
- self.app_context.name,
- '--all' if delete_all else '',
- '--channel ' + channel if channel is not None else ''))
-
- def query(self, data):
- return query_engine(data, self.ip, self.port)
+ '--ip {}'.format(ip) if ip else '',
+ '--port {}'.format(port) if port else '',
+ '--engine-instance-id {}'.format(engine_instance_id) if engine_instance_id else '',
+ '--feedback' if feedback else '',
+ '--accesskey {}'.format(accesskey) if accesskey else '',
+ '--event-server-ip {}'.format(event_server_ip) if event_server_ip else '',
+ '--event-server-port {}'.format(event_server_port) if event_server_port else '',
+ '--batch {}'.format(bach) if batch else '',
+ '--scratch-uri {}'.format(scratch_uri) if scratch_uri else '')
+
+ self.deployed_process = srun_bg(command)
+ time.sleep(wait_time)
+ if self.deployed_process.poll() is not None:
+ raise Exception('Application engine terminated')
+ self.ip = ip if ip else 'localhost'
+ self.port = port if port else 8000
+
+ def stop(self):
+ """ Kills deployed engine """
+ if self.deployed_process:
+ self.deployed_process.kill()
+
+ def new_channel(self, channel):
+ srun('pio app channel-new {0}'.format(channel))
+
+ def delete_channel(self, channel):
+ srun('pio app channel-delete {0} --force'.format(channel))
+
+ def send_event(self, event):
+ return send_event(event, self.test_context, self.access_key)
+
+ def send_events_batch(self, events):
+ return send_events_batch(events, self.test_context, self.access_key)
+
+ def import_events_batch(self, events):
+ return import_events_batch(events, self.test_context, self.id)
+
+ def get_events(self, params={}):
+ return get_events(self.test_context, self.access_key, params)
+
+ def delete_data(self, delete_all=True, channel=None):
+ srun('pio app data-delete {0} {1} {2} --force'
+ .format(
+ self.app_context.name,
+ '--all' if delete_all else '',
+ '--channel ' + channel if channel is not None else ''))
+
+ def query(self, data):
+ return query_engine(data, self.ip, self.port)