You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2016/08/09 21:43:41 UTC

[33/52] [abbrv] incubator-predictionio git commit: Change indentation to 2 spaces and add some additional comments

Change indentation to 2 spaces and add some additional comments

Document that send_events_batch() has batch size limit of 50


Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/352eb5e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/352eb5e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/352eb5e0

Branch: refs/heads/develop
Commit: 352eb5e04c8ce5e80bee9c09fbeaee5553b00d38
Parents: 7192fbb
Author: Chan Lee <ch...@gmail.com>
Authored: Mon Aug 1 16:40:01 2016 -0700
Committer: Marcin Ziemi\u0144ski <zi...@gmail.com>
Committed: Wed Aug 3 14:42:06 2016 -0700

----------------------------------------------------------------------
 globals.py                      |  16 +-
 integration.py                  |  68 ++---
 scenarios/basic_app_usecases.py | 270 +++++++++---------
 scenarios/eventserver_test.py   |   2 +-
 scenarios/quickstart_test.py    | 222 +++++++--------
 tests.py                        |  74 ++---
 utils.py                        | 526 ++++++++++++++++++-----------------
 7 files changed, 590 insertions(+), 588 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/352eb5e0/globals.py
----------------------------------------------------------------------
diff --git a/globals.py b/globals.py
index 6d4abd3..1134501 100644
--- a/globals.py
+++ b/globals.py
@@ -5,13 +5,13 @@ SUPPRESS_STDERR=False
 LOGGER_NAME='INT_TESTS'
 
 def std_out():
-    if SUPPRESS_STDOUT:
-        return subprocess.DEVNULL
-    else:
-        return None
+  if SUPPRESS_STDOUT:
+    return subprocess.DEVNULL
+  else:
+    return None
 
 def std_err():
-    if SUPPRESS_STDERR:
-        return subprocess.DEVNULL
-    else:
-        return None
+  if SUPPRESS_STDERR:
+    return subprocess.DEVNULL
+  else:
+    return None

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/352eb5e0/integration.py
----------------------------------------------------------------------
diff --git a/integration.py b/integration.py
index 6cfa267..441365e 100644
--- a/integration.py
+++ b/integration.py
@@ -3,44 +3,44 @@ import logging
 import pio_tests.globals as globals
 
 class TestContext:
-    """Class representing the settings provided for every test"""
+  """Class representing the settings provided for every test"""
 
-    def __init__(self, engine_directory, data_directory, es_ip='0.0.0.0', es_port=7070):
-        """
-        Args:
-            engine_directory (str): path to the directory where the engines are stored
-            data_directory (str):   path to the directory where tests can keep their data
-            es_ip (str):            ip of the eventserver
-            es_port (int):          port of the eventserver
-        """
-        self.engine_directory = engine_directory
-        self.data_directory = data_directory
-        self.es_ip = es_ip
-        self.es_port = es_port
+  def __init__(self, engine_directory, data_directory, es_ip='0.0.0.0', es_port=7070):
+    """
+    Args:
+      engine_directory (str): path to the directory where the engines are stored
+      data_directory (str):   path to the directory where tests can keep their data
+      es_ip (str):            ip of the eventserver
+      es_port (int):          port of the eventserver
+    """
+    self.engine_directory = engine_directory
+    self.data_directory = data_directory
+    self.es_ip = es_ip
+    self.es_port = es_port
 
 class BaseTestCase(unittest.TestCase):
-    """This is the base class for all integration tests
+  """This is the base class for all integration tests
 
-    This class sets up a `TestContext` object and a logger for every test case
-    """
-    def __init__(self, test_context, methodName='runTest'):
-        super(BaseTestCase, self).__init__(methodName)
-        self.test_context = test_context
-        self.log = logging.getLogger(globals.LOGGER_NAME)
+  This class sets up a `TestContext` object and a logger for every test case
+  """
+  def __init__(self, test_context, methodName='runTest'):
+    super(BaseTestCase, self).__init__(methodName)
+    self.test_context = test_context
+    self.log = logging.getLogger(globals.LOGGER_NAME)
 
 class AppContext:
-    """ This class is a description of an instance of the engine"""
+  """ This class is a description of an instance of the engine"""
 
-    def __init__(self, name, template, engine_json_path=None):
-        """
-        Args:
-            name (str): application name
-            template (str): either the name of an engine from the engines directory
-                or a link to repository with the engine
-            engine_json_path (str): path to json file describing an engine (a custom engine.json)
-                to be used for the application. If `None`, engine.json from the engine's directory
-                will be used
-        """
-        self.name = name
-        self.template = template
-        self.engine_json_path = engine_json_path
+  def __init__(self, name, template, engine_json_path=None):
+    """
+    Args:
+      name (str): application name
+      template (str): either the name of an engine from the engines directory
+          or a link to repository with the engine
+      engine_json_path (str): path to json file describing an engine (a custom engine.json)
+          to be used for the application. If `None`, engine.json from the engine's directory
+          will be used
+    """
+    self.name = name
+    self.template = template
+    self.engine_json_path = engine_json_path

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/352eb5e0/scenarios/basic_app_usecases.py
----------------------------------------------------------------------
diff --git a/scenarios/basic_app_usecases.py b/scenarios/basic_app_usecases.py
index 131b2e9..94d2f34 100644
--- a/scenarios/basic_app_usecases.py
+++ b/scenarios/basic_app_usecases.py
@@ -10,146 +10,146 @@ from utils import *
 ITEMS_COUNT = 12
 
 def get_buy_events(users, per_user=2):
-    events = []
-    for u in range(users):
-        items = set([random.randint(0, ITEMS_COUNT) for i in range(per_user)])
-        for item in items:
-            events.append({
-                "event": "buy",
-                "entityType": "user",
-                "entityId": u,
-                "targetEntityType": "item",
-                "targetEntityId": item })
-
-    return events
+  events = []
+  for u in range(users):
+    items = set([random.randint(0, ITEMS_COUNT) for i in range(per_user)])
+    for item in items:
+      events.append({
+        "event": "buy",
+        "entityType": "user",
+        "entityId": u,
+        "targetEntityType": "item",
+        "targetEntityId": item })
+
+  return events
 
 def get_rate_events(users, per_user=2):
-    events = []
-    for u in range(users):
-        items = set([random.randint(0, ITEMS_COUNT) for i in range(per_user)])
-        for item in items:
-            events.append( {
-                    "event": "rate",
-                    "entityType": "user",
-                    "entityId": u,
-                    "targetEntityType": "item",
-                    "targetEntityId": item,
-                    "properties": { "rating" : float(random.randint(1,5)) } })
+  events = []
+  for u in range(users):
+    items = set([random.randint(0, ITEMS_COUNT) for i in range(per_user)])
+    for item in items:
+      events.append( {
+        "event": "rate",
+        "entityType": "user",
+        "entityId": u,
+        "targetEntityType": "item",
+        "targetEntityId": item,
+        "properties": { "rating" : float(random.randint(1,5)) } })
 
-    return events
+  return events
 
 
 class BasicAppUsecases(BaseTestCase):
 
-    def setUp(self):
-        random.seed(3)
-        self.log.info("Setting up the engine")
-
-        template_path = pjoin(
-                self.test_context.engine_directory, "recommendation-engine")
-        engine_json_path = pjoin(
-                self.test_context.data_directory, "quickstart_test/engine.json")
-
-        app_context = AppContext(
-                name="MyRecommender",
-                template=template_path,
-                engine_json_path=engine_json_path)
-
-        self.app = AppEngine(self.test_context, app_context)
-        self.app_pid = None
-
-    def runTest(self):
-        self.app_creation()
-        self.check_app_list()
-        self.check_data()
-        self.check_build()
-        self.check_train_and_deploy()
-
-    def app_creation(self):
-        self.log.info("Adding a new application")
-        description = "SomeDescription"
-        self.app.new(description=description)
-        self.assertEqual(description, self.app.description)
-
-        self.log.info("Creating an app again - should fail")
-        self.assertRaises(CalledProcessError, lambda : self.app.new())
-
-    def check_app_list(self):
-        self.log.info("Checking if app is on the list")
-        apps = pio_app_list()
-        self.assertEqual(1,
-                len([a for a in apps if a['name'] == self.app.app_context.name]))
-
-    def check_data(self):
-        self.log.info("Importing events")
-        buy_events = get_buy_events(20, 1)
-        rate_events = get_rate_events(20, 1)
-
-        for ev in buy_events + rate_events:
-            self.assertEquals(201, self.app.send_event(ev).status_code)
-
-        self.log.info("Checking imported events")
-        r = self.app.get_events(params={'limit': -1})
-        self.assertEqual(200, r.status_code)
-        self.assertEqual(len(buy_events) + len(rate_events), len(r.json()))
-
-        self.log.info("Deleting entire data")
-        self.app.delete_data()
-        self.log.info("Checking if there are no events at all")
-        r = self.app.get_events(params={'limit': -1})
-        self.assertEqual(404, r.status_code)
-
-    def check_build(self):
-        self.log.info("Clean build")
-        self.app.build(clean=True)
-        self.log.info("Second build")
-        self.app.build()
-
-    def check_train_and_deploy(self):
-        self.log.info("import some data first")
-        buy_events = get_buy_events(20, 5)
-        rate_events = get_rate_events(20, 5)
-        for ev in buy_events + rate_events:
-            self.assertEquals(201, self.app.send_event(ev).status_code)
-
-        self.log.info("Training")
-        self.app.train()
-        self.log.info("Deploying")
-        self.app.deploy()
-        self.assertFalse(self.app.deployed_process.poll())
-
-        self.log.info("Importing more events")
-        buy_events = get_buy_events(60, 5)
-        rate_events = get_rate_events(60, 5)
-        for ev in buy_events + rate_events:
-            self.assertEquals(201, self.app.send_event(ev).status_code)
-
-        self.log.info("Training again")
-        self.app.train()
-
-        time.sleep(7)
-
-        self.log.info("Check serving")
-        r = self.app.query({"user": 1, "num": 5})
-        self.assertEqual(200, r.status_code)
-        result = r.json()
-        self.assertEqual(5, len(result['itemScores']))
-        r = self.app.query({"user": 5, "num": 3})
-        self.assertEqual(200, r.status_code)
-        result = r.json()
-        self.assertEqual(3, len(result['itemScores']))
-
-        self.log.info("Remove data")
-        self.app.delete_data()
-        self.log.info("Retraining should fail")
-        self.assertRaises(CalledProcessError, lambda: self.app.train())
-
-
-    def tearDown(self):
-        self.log.info("Stopping deployed engine")
-        self.app.stop()
-        self.log.info("Deleting all related data")
-        self.app.delete_data()
-        self.log.info("Removing an app")
-        self.app.delete()
+  def setUp(self):
+    random.seed(3)
+    self.log.info("Setting up the engine")
+
+    template_path = pjoin(
+        self.test_context.engine_directory, "recommendation-engine")
+    engine_json_path = pjoin(
+        self.test_context.data_directory, "quickstart_test/engine.json")
+
+    app_context = AppContext(
+        name="MyRecommender",
+        template=template_path,
+        engine_json_path=engine_json_path)
+
+    self.app = AppEngine(self.test_context, app_context)
+    self.app_pid = None
+
+  def runTest(self):
+    self.app_creation()
+    self.check_app_list()
+    self.check_data()
+    self.check_build()
+    self.check_train_and_deploy()
+
+  def app_creation(self):
+    self.log.info("Adding a new application")
+    description = "SomeDescription"
+    self.app.new(description=description)
+    self.assertEqual(description, self.app.description)
+
+    self.log.info("Creating an app again - should fail")
+    self.assertRaises(CalledProcessError, lambda : self.app.new())
+
+  def check_app_list(self):
+    self.log.info("Checking if app is on the list")
+    apps = pio_app_list()
+    self.assertEqual(1,
+        len([a for a in apps if a['name'] == self.app.app_context.name]))
+
+  def check_data(self):
+    self.log.info("Importing events")
+    buy_events = get_buy_events(20, 1)
+    rate_events = get_rate_events(20, 1)
+
+    for ev in buy_events + rate_events:
+      self.assertEquals(201, self.app.send_event(ev).status_code)
+
+    self.log.info("Checking imported events")
+    r = self.app.get_events(params={'limit': -1})
+    self.assertEqual(200, r.status_code)
+    self.assertEqual(len(buy_events) + len(rate_events), len(r.json()))
+
+    self.log.info("Deleting entire data")
+    self.app.delete_data()
+    self.log.info("Checking if there are no events at all")
+    r = self.app.get_events(params={'limit': -1})
+    self.assertEqual(404, r.status_code)
+
+  def check_build(self):
+    self.log.info("Clean build")
+    self.app.build(clean=True)
+    self.log.info("Second build")
+    self.app.build()
+
+  def check_train_and_deploy(self):
+    self.log.info("import some data first")
+    buy_events = get_buy_events(20, 5)
+    rate_events = get_rate_events(20, 5)
+    for ev in buy_events + rate_events:
+      self.assertEquals(201, self.app.send_event(ev).status_code)
+
+    self.log.info("Training")
+    self.app.train()
+    self.log.info("Deploying")
+    self.app.deploy()
+    self.assertFalse(self.app.deployed_process.poll())
+
+    self.log.info("Importing more events")
+    buy_events = get_buy_events(60, 5)
+    rate_events = get_rate_events(60, 5)
+    for ev in buy_events + rate_events:
+      self.assertEquals(201, self.app.send_event(ev).status_code)
+
+    self.log.info("Training again")
+    self.app.train()
+
+    time.sleep(7)
+
+    self.log.info("Check serving")
+    r = self.app.query({"user": 1, "num": 5})
+    self.assertEqual(200, r.status_code)
+    result = r.json()
+    self.assertEqual(5, len(result['itemScores']))
+    r = self.app.query({"user": 5, "num": 3})
+    self.assertEqual(200, r.status_code)
+    result = r.json()
+    self.assertEqual(3, len(result['itemScores']))
+
+    self.log.info("Remove data")
+    self.app.delete_data()
+    self.log.info("Retraining should fail")
+    self.assertRaises(CalledProcessError, lambda: self.app.train())
+
+
+  def tearDown(self):
+    self.log.info("Stopping deployed engine")
+    self.app.stop()
+    self.log.info("Deleting all related data")
+    self.app.delete_data()
+    self.log.info("Removing an app")
+    self.app.delete()
 

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/352eb5e0/scenarios/eventserver_test.py
----------------------------------------------------------------------
diff --git a/scenarios/eventserver_test.py b/scenarios/eventserver_test.py
index 4675299..2cd3d87 100644
--- a/scenarios/eventserver_test.py
+++ b/scenarios/eventserver_test.py
@@ -74,7 +74,7 @@ class EventserverTest(BaseTestCase):
         self.load_events("signup_events_51.json"))
     self.assertEqual(400, r.status_code)
 
-    self.log.info("Importing batch of events")
+    self.log.info("Importing events from file does not have batch size limit")
     self.app.import_events_batch(
         self.load_events("signup_events_51.json"))
 

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/352eb5e0/scenarios/quickstart_test.py
----------------------------------------------------------------------
diff --git a/scenarios/quickstart_test.py b/scenarios/quickstart_test.py
index 8691f21..080c26a 100644
--- a/scenarios/quickstart_test.py
+++ b/scenarios/quickstart_test.py
@@ -6,120 +6,120 @@ from pio_tests.integration import BaseTestCase, AppContext
 from utils import AppEngine, srun, pjoin
 
 def read_events(file_path):
-    RATE_ACTIONS_DELIMITER = "::"
-    with open(file_path, 'r') as f:
-        events = []
-        for line in f:
-            data = line.rstrip('\r\n').split(RATE_ACTIONS_DELIMITER)
-            if random.randint(0, 1) == 1:
-                events.append( {
-                        "event": "rate",
-                        "entityType": "user",
-                        "entityId": data[0],
-                        "targetEntityType": "item",
-                        "targetEntityId": data[1],
-                        "properties": { "rating" : float(data[2]) } })
-            else:
-                events.append({
-                    "event": "buy",
-                    "entityType": "user",
-                    "entityId": data[0],
-                    "targetEntityType": "item",
-                    "targetEntityId": data[1] })
-
-        return events
+  RATE_ACTIONS_DELIMITER = "::"
+  with open(file_path, 'r') as f:
+    events = []
+    for line in f:
+      data = line.rstrip('\r\n').split(RATE_ACTIONS_DELIMITER)
+      if random.randint(0, 1) == 1:
+        events.append( {
+          "event": "rate",
+          "entityType": "user",
+          "entityId": data[0],
+          "targetEntityType": "item",
+          "targetEntityId": data[1],
+          "properties": { "rating" : float(data[2]) } })
+      else:
+        events.append({
+          "event": "buy",
+          "entityType": "user",
+          "entityId": data[0],
+          "targetEntityType": "item",
+          "targetEntityId": data[1] })
+
+    return events
 
 
 class QuickStartTest(BaseTestCase):
 
-    def setUp(self):
-        self.log.info("Setting up the engine")
-
-        template_path = pjoin(
-                self.test_context.engine_directory, "recommendation-engine")
-        engine_json_path = pjoin(
-                self.test_context.data_directory, "quickstart_test/engine.json")
-
-        self.training_data_path = pjoin(
-                self.test_context.data_directory,
-                "quickstart_test/training_data.txt")
-
-        # downloading training data
-        srun('curl https://raw.githubusercontent.com/apache/spark/master/' \
-                'data/mllib/sample_movielens_data.txt --create-dirs -o {}'
-                .format(self.training_data_path))
-
-        app_context = AppContext(
-                name="MyRecommender",
-                template=template_path,
-                engine_json_path=engine_json_path)
-
-        self.app = AppEngine(self.test_context, app_context)
-
-    def runTest(self):
-        self.log.info("Adding a new application")
-        self.app.new()
-
-        event1 = {
-            "event" : "rate",
-            "entityType" : "user",
-            "entityId" : "u0",
-            "targetEntityType" : "item",
-            "targetEntityId" : "i0",
-            "properties" : {
-                "rating" : 5
-                },
-            "eventTime" : "2014-11-02T09:39:45.618-08:00" }
-
-        event2 = {
-            "event" : "buy",
-            "entityType" : "user",
-            "entityId" : "u1",
-            "targetEntityType" : "item",
-            "targetEntityId" : "i2",
-            "eventTime" : "2014-11-10T12:34:56.123-08:00" }
-
-        self.log.info("Sending two test events")
-        self.assertListEqual(
-                [201, 201],
-                [self.app.send_event(e).status_code for e in [event1, event2]])
-
-        self.log.info("Checking the number of events stored on the server")
-        r = self.app.get_events()
-        self.assertEquals(200, r.status_code)
-        stored_events = r.json()
-        self.assertEqual(2, len(stored_events))
-
-        self.log.info("Importing many events")
-        new_events = read_events(self.training_data_path)
-        for ev in new_events:
-            r = self.app.send_event(ev)
-            self.assertEqual(201, r.status_code)
-
-        self.log.info("Checking the number of events stored on the server after the update")
-        r = self.app.get_events(params={'limit': -1})
-        self.assertEquals(200, r.status_code)
-        stored_events = r.json()
-        self.assertEquals(len(new_events) + 2, len(stored_events))
-
-        self.log.info("Building an engine...")
-        self.app.build()
-        self.log.info("Training...")
-        self.app.train()
-        self.log.info("Deploying and waiting 15s for it to start...")
-        self.app.deploy(wait_time=15)
-
-        self.log.info("Sending a single query and checking results")
-        user_query = { "user": 1, "num": 4 }
-        r = self.app.query(user_query)
-        self.assertEqual(200, r.status_code)
-        result = r.json()
-        self.assertEqual(4, len(result['itemScores']))
+  def setUp(self):
+    self.log.info("Setting up the engine")
+
+    template_path = pjoin(
+        self.test_context.engine_directory, "recommendation-engine")
+    engine_json_path = pjoin(
+        self.test_context.data_directory, "quickstart_test/engine.json")
+
+    self.training_data_path = pjoin(
+        self.test_context.data_directory,
+        "quickstart_test/training_data.txt")
+
+    # downloading training data
+    srun('curl https://raw.githubusercontent.com/apache/spark/master/' \
+            'data/mllib/sample_movielens_data.txt --create-dirs -o {}'
+            .format(self.training_data_path))
+
+    app_context = AppContext(
+        name="MyRecommender",
+        template=template_path,
+        engine_json_path=engine_json_path)
+
+    self.app = AppEngine(self.test_context, app_context)
+
+  def runTest(self):
+    self.log.info("Adding a new application")
+    self.app.new()
+
+    event1 = {
+      "event" : "rate",
+      "entityType" : "user",
+      "entityId" : "u0",
+      "targetEntityType" : "item",
+      "targetEntityId" : "i0",
+      "properties" : {
+        "rating" : 5
+      },
+      "eventTime" : "2014-11-02T09:39:45.618-08:00" }
+
+    event2 = {
+      "event" : "buy",
+      "entityType" : "user",
+      "entityId" : "u1",
+      "targetEntityType" : "item",
+      "targetEntityId" : "i2",
+      "eventTime" : "2014-11-10T12:34:56.123-08:00" }
+
+    self.log.info("Sending two test events")
+    self.assertListEqual(
+        [201, 201],
+        [self.app.send_event(e).status_code for e in [event1, event2]])
+
+    self.log.info("Checking the number of events stored on the server")
+    r = self.app.get_events()
+    self.assertEquals(200, r.status_code)
+    stored_events = r.json()
+    self.assertEqual(2, len(stored_events))
+
+    self.log.info("Importing many events")
+    new_events = read_events(self.training_data_path)
+    for ev in new_events:
+      r = self.app.send_event(ev)
+      self.assertEqual(201, r.status_code)
+
+    self.log.info("Checking the number of events stored on the server after the update")
+    r = self.app.get_events(params={'limit': -1})
+    self.assertEquals(200, r.status_code)
+    stored_events = r.json()
+    self.assertEquals(len(new_events) + 2, len(stored_events))
+
+    self.log.info("Building an engine...")
+    self.app.build()
+    self.log.info("Training...")
+    self.app.train()
+    self.log.info("Deploying and waiting 15s for it to start...")
+    self.app.deploy(wait_time=15)
+
+    self.log.info("Sending a single query and checking results")
+    user_query = { "user": 1, "num": 4 }
+    r = self.app.query(user_query)
+    self.assertEqual(200, r.status_code)
+    result = r.json()
+    self.assertEqual(4, len(result['itemScores']))
 
     def tearDown(self):
-        self.log.info("Stopping deployed engine")
-        self.app.stop()
-        self.log.info("Deleting all related data")
-        self.app.delete_data()
-        self.log.info("Removing an app")
-        self.app.delete()
+      self.log.info("Stopping deployed engine")
+      self.app.stop()
+      self.log.info("Deleting all related data")
+      self.app.delete_data()
+      self.log.info("Removing an app")
+      self.app.delete()

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/352eb5e0/tests.py
----------------------------------------------------------------------
diff --git a/tests.py b/tests.py
index 0a95593..c84bb08 100755
--- a/tests.py
+++ b/tests.py
@@ -16,13 +16,13 @@ parser = argparse.ArgumentParser(description='Integration tests for PredictionIO
 parser.add_argument('--eventserver-ip', default='0.0.0.0')
 parser.add_argument('--eventserver-port', type=int, default=7070)
 parser.add_argument('--no-shell-stdout', action='store_true',
-        help='Suppress STDOUT output from shell executed commands')
+    help='Suppress STDOUT output from shell executed commands')
 parser.add_argument('--no-shell-stderr', action='store_true',
-        help='Suppress STDERR output from shell executed commands')
+    help='Suppress STDERR output from shell executed commands')
 parser.add_argument('--logging', action='store', choices=['INFO', 'DEBUG', 'NO_LOGGING'],
-        default='NO_LOGGING', help='Choose the logging level')
+    default='NO_LOGGING', help='Choose the logging level')
 parser.add_argument('--tests', nargs='*', type=str,
-        default=None, help='Names of the tests to execute. By default all tests will be checked')
+    default=None, help='Names of the tests to execute. By default all tests will be checked')
 
 TESTS_DIRECTORY = os.path.abspath(os.path.dirname(__file__))
 ENGINE_DIRECTORY = os.path.join(TESTS_DIRECTORY, "engines")
@@ -32,44 +32,44 @@ LOGGING_FORMAT = '[%(levelname)s] %(module)s %(asctime)-15s: %(message)s'
 logging.basicConfig(format=LOGGING_FORMAT)
 
 def get_tests(test_context):
-    # ========= ADD TESTS HERE!!! ================================
-    return {'QuickStart': QuickStartTest(test_context),
-            'BasicAppUsecases': BasicAppUsecases(test_context),
-            'EventserverTest': EventserverTest(test_context)}
+  # ========= ADD TESTS HERE!!! ================================
+  return {'QuickStart': QuickStartTest(test_context),
+          'BasicAppUsecases': BasicAppUsecases(test_context),
+          'EventserverTest': EventserverTest(test_context)}
 
 if __name__ == "__main__":
-    args = vars(parser.parse_args())
+  args = vars(parser.parse_args())
 
-    if args.get('no_shell_stdout'):
-        globals.SUPPRESS_STDOUT = True
-    if args.get('no_shell_stderr'):
-        globals.SUPPRESS_STDERR = True
+  if args.get('no_shell_stdout'):
+    globals.SUPPRESS_STDOUT = True
+  if args.get('no_shell_stderr'):
+    globals.SUPPRESS_STDERR = True
 
-    # setting up logging
-    log_opt = args['logging']
-    logger = logging.getLogger(globals.LOGGER_NAME)
-    if log_opt == 'INFO':
-        logger.level = logging.INFO
-    elif log_opt == 'DEBUG':
-        logger.level = logging.DEBUG
+  # setting up logging
+  log_opt = args['logging']
+  logger = logging.getLogger(globals.LOGGER_NAME)
+  if log_opt == 'INFO':
+    logger.level = logging.INFO
+  elif log_opt == 'DEBUG':
+    logger.level = logging.DEBUG
 
-    test_context = TestContext(
-            ENGINE_DIRECTORY, DATA_DIRECTORY, args['eventserver_ip'], int(args['eventserver_port']))
+  test_context = TestContext(
+      ENGINE_DIRECTORY, DATA_DIRECTORY, args['eventserver_ip'], int(args['eventserver_port']))
 
-    tests_dict = get_tests(test_context)
-    test_names = args['tests']
-    tests = []
-    if test_names is not None:
-        tests = [t for name, t in tests_dict.items() if name in test_names]
-    else:
-        tests = tests_dict.values()
+  tests_dict = get_tests(test_context)
+  test_names = args['tests']
+  tests = []
+  if test_names is not None:
+    tests = [t for name, t in tests_dict.items() if name in test_names]
+  else:
+    tests = tests_dict.values()
 
-    # Actual tests execution
-    event_server_process = srun_bg('pio eventserver --ip {} --port {}'
-            .format(test_context.es_ip, test_context.es_port))
-    time.sleep(5)
-    result = xmlrunner.XMLTestRunner(verbosity=2, output='test-reports').run(unittest.TestSuite(tests))
-    event_server_process.kill()
+  # Actual tests execution
+  event_server_process = srun_bg('pio eventserver --ip {} --port {}'
+      .format(test_context.es_ip, test_context.es_port))
+  time.sleep(5)
+  result = xmlrunner.XMLTestRunner(verbosity=2, output='test-reports').run(unittest.TestSuite(tests))
+  event_server_process.kill()
 
-    if not result.wasSuccessful():
-        sys.exit(1)
+  if not result.wasSuccessful():
+    sys.exit(1)

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/352eb5e0/utils.py
----------------------------------------------------------------------
diff --git a/utils.py b/utils.py
index d837984..8066c4b 100644
--- a/utils.py
+++ b/utils.py
@@ -9,294 +9,296 @@ from os.path import join as pjoin
 import pio_tests.globals as globals
 
 def srun(command):
-    """ Runs a shell command given as a `str`
-    Raises: `subprocess.CalledProcessError` when exit code != 0
-    """
-    return run(command, shell=True, stdout=globals.std_out(),
-            stderr=globals.std_err(), check=True)
+  """ Runs a shell command given as a `str`
+  Raises: `subprocess.CalledProcessError` when exit code != 0
+  """
+  return run(command, shell=True, stdout=globals.std_out(),
+      stderr=globals.std_err(), check=True)
 
 def srun_out(command):
-    """ Runs a shell command given as a `str`
-    Returns: string with command's output
-    Raises: `subprocess.CalledProcessError` when exit code != 0
-    """
-    return check_output(command, shell=True, universal_newlines=True,
-            stderr=globals.std_err())
+  """ Runs a shell command given as a `str`
+  Returns: string with command's output
+  Raises: `subprocess.CalledProcessError` when exit code != 0
+  """
+  return check_output(command, shell=True, universal_newlines=True,
+      stderr=globals.std_err())
 
 def srun_bg(command):
-    """ Runs a shell command given as a `str` in the background
-    Returns: (obj: `subprocess.Popen`) for executed process
-    """
-    return Popen(command, shell=True, stdout=globals.std_out(),
-            stderr=globals.std_err())
+  """ Runs a shell command given as a `str` in the background
+  Returns: (obj: `subprocess.Popen`) for executed process
+  """
+  return Popen(command, shell=True, stdout=globals.std_out(),
+      stderr=globals.std_err())
 
 def repository_dirname(template):
-    """ Utility function getting repository name from the link
-    Example: for "https://github.com/user/SomeRepo" should return "SomeRepo"
-    """
-    return template.split('/')[-1]
+  """ Utility function getting repository name from the link
+  Example: for "https://github.com/user/SomeRepo" should return "SomeRepo"
+  """
+  return template.split('/')[-1]
 
 def obtain_template(engine_dir, template):
-    """Given a directory with engines and a template downloads an engine
-    if neccessary
-    Args:
-        engine_dir (str): directory where engines are stored
-        template (str): either the name of an engine from the engines directory
-            or a link to repository with the engine
-    Returns: str: with the engine's path
-    """
-    if re.match('^https?:\/\/', template):
-        dest_dir = pjoin(engine_dir, repository_dirname(template))
-        if not os.path.exists(dest_dir):
-            srun('git clone --depth=1 {0} {1}'.format(template, dest_dir))
-        return dest_dir
-    else:
-        # check if exists
-        dest_dir = pjoin(engine_dir, template)
-        if not os.path.exists(dest_dir):
-            raise ValueError('Engine {0} does not exist in {1}'
-                    .format(template, engine_dir))
-
-        return dest_dir
+  """Given a directory with engines and a template downloads an engine
+  if neccessary
+  Args:
+    engine_dir (str): directory where engines are stored
+    template (str): either the name of an engine from the engines directory
+        or a link to repository with the engine
+  Returns: str with the engine's path
+  """
+  if re.match('^https?:\/\/', template):
+    dest_dir = pjoin(engine_dir, repository_dirname(template))
+    if not os.path.exists(dest_dir):
+      srun('git clone --depth=1 {0} {1}'.format(template, dest_dir))
+    return dest_dir
+  else:
+    # check if exists
+    dest_dir = pjoin(engine_dir, template)
+    if not os.path.exists(dest_dir):
+      raise ValueError('Engine {0} does not exist in {1}'
+          .format(template, engine_dir))
+
+    return dest_dir
 
 def pio_app_list():
-    """Returns: a list of dicts for every application with the following keys:
-        `name`, `id`, `access_key`, `allowed_events`
-    """
-    output = srun_out('pio app list').rstrip()
-    return [ { 'name': line[2], 'id': int(line[4]),
-               'access_key': line[6], 'allowed_events': line[8] }
-            for line in [x.split() for x in output.split('\n')[1:-1]] ]
+  """Returns: a list of dicts for every application with the following keys:
+      `name`, `id`, `access_key`, `allowed_events`
+  """
+  output = srun_out('pio app list').rstrip()
+  return [ { 'name': line[2], 'id': int(line[4]),
+             'access_key': line[6], 'allowed_events': line[8] }
+          for line in [x.split() for x in output.split('\n')[1:-1]] ]
 
 def get_app_eventserver_url_json(test_context):
-    return 'http://{}:{}/events.json'.format(
-            test_context.es_ip, test_context.es_port)
+  return 'http://{}:{}/events.json'.format(
+      test_context.es_ip, test_context.es_port)
 
 def get_engine_url_json(engine_ip, engine_port):
-    return 'http://{}:{}/queries.json'.format(
-            engine_ip, engine_port)
+  return 'http://{}:{}/queries.json'.format(
+      engine_ip, engine_port)
 
 def send_event(event, test_context, access_key):
-    """ Sends an event to the eventserver
-    Args:
-        event: json-like dictionary describing an event
-        test_context (obj: `TestContext`):
-        access_key: applications access key 
-    Returns: `requests.Response`
-    """
-    # TODO: Add channel param
-    url = get_app_eventserver_url_json(test_context)
-    return requests.post(
-            url,
-            params={'accessKey': access_key},
-            json=event)
+  """ Sends an event to the eventserver
+  Args:
+    event: json-like dictionary describing an event
+    test_context (obj: `TestContext`):
+    access_key: application's access key
+  Returns: `requests.Response`
+  """
+  # TODO: Add channel param
+  url = get_app_eventserver_url_json(test_context)
+  return requests.post(
+      url,
+      params={'accessKey': access_key},
+      json=event)
 
 def send_events_batch(events, test_context, access_key):
-    """ Send events in batch to the eventserver
-    Args:
-        events: a list of json-like dictionaries for events
-        test_context (obj: `TestContext`):
-        access_key: applications access key
-    Returns: `requests.Response`
-    """
-    url = 'http://{}:{}/batch/events.json'.format(
-        test_context.es_ip, test_context.es_port)
-    return requests.post(
-            url,
-            params={'accessKey': access_key},
-            json=events)
+  """ Send events in batch via REST to the eventserver
+  Args:
+    events: a list of json-like dictionaries for events
+    test_context (obj: `TestContext`):
+    access_key: application's access key
+  Returns: `requests.Response`
+  Requires: Events length must not exceed length of 50
+    http://docs.prediction.io/datacollection/eventmodel/#3.-batch-events-to-the-eventserver
+  """
+  url = 'http://{}:{}/batch/events.json'.format(
+      test_context.es_ip, test_context.es_port)
+  return requests.post(
+      url,
+      params={'accessKey': access_key},
+      json=events)
 
 
 def import_events_batch(events, test_context, appid, channel=None):
-    """ Imports events in batch from file with `pio import`
-    Args:
-        events: a list of json-like dictionaries for events
-        test_context (obj: `TestContext`)
-        appid (int): application's id
-        channel (str):
-    """
-    # Writing events list to temporary file.
-    # `pio import` requires each line of input file to be a JSON string
-    # representing an event. Also, empty lines are not allowed.
-    contents = ''
-    for ev in events:
-        contents += '{}\n'.format(json.dumps(ev))
-    contents.rstrip('\n')
-
-    file_path = pjoin(test_context.data_directory, 'events.json.tmp')
-    try:
-        with open(file_path, 'w') as f:
-            f.write(contents)
-        srun('pio import --appid {} --input {} {}'.format(
-            appid,
-            file_path,
-            '--channel {}'.format(channel) if channel else ''))
-    finally:
-        os.remove(file_path)
+  """ Imports events in batch from file with `pio import`
+  Args:
+    events: a list of json-like dictionaries for events
+    test_context (obj: `TestContext`)
+    appid (int): application's id
+    channel (str):
+  """
+  # Writing events list to temporary file.
+  # `pio import` requires each line of input file to be a JSON string
+  # representing an event. Empty lines are not allowed.
+  contents = ''
+  for ev in events:
+      contents += '{}\n'.format(json.dumps(ev))
+  contents.rstrip('\n')
+
+  file_path = pjoin(test_context.data_directory, 'events.json.tmp')
+  try:
+      with open(file_path, 'w') as f:
+          f.write(contents)
+      srun('pio import --appid {} --input {} {}'.format(
+          appid,
+          file_path,
+          '--channel {}'.format(channel) if channel else ''))
+  finally:
+      os.remove(file_path)
 
 def get_events(test_context, access_key, params={}):
-    """ Gets events for some application
-    Args:
-         test_context (obj: `TestContext`)
-         access_key (str):
-         params (dict): special parameters for eventserver's GET, e.g:
-            'limit', 'reversed', 'event'. See the docs
-    Returns: `requests.Response`
-    """
-    url = get_app_eventserver_url_json(test_context)
-    return requests.get(url, params=dict({'accessKey': access_key}, **params))
+  """ Gets events for some application
+  Args:
+    test_context (obj: `TestContext`)
+    access_key (str):
+    params (dict): special parameters for eventserver's GET, e.g:
+        'limit', 'reversed', 'event'. See the docs
+  Returns: `requests.Response`
+  """
+  url = get_app_eventserver_url_json(test_context)
+  return requests.get(url, params=dict({'accessKey': access_key}, **params))
 
 def query_engine(data, engine_ip='localhost', engine_port=8000):
-    """ Send a query to deployed engine
-    Args:
-        data (dict): json-like dictionary being an input to an engine
-        access_key (str):
-        engine_ip (str): ip of deployed engine
-        engine_port (int): port of deployed engine
-    Returns: `requests.Response`
-    """
-    url = get_engine_url_json(engine_ip, engine_port)
-    return requests.post(url, json=data)
+  """ Send a query to deployed engine
+  Args:
+    data (dict): json-like dictionary being an input to an engine
+    access_key (str):
+    engine_ip (str): ip of deployed engine
+    engine_port (int): port of deployed engine
+  Returns: `requests.Response`
+  """
+  url = get_engine_url_json(engine_ip, engine_port)
+  return requests.post(url, json=data)
 
 class AppEngine:
-    """ This is a utility class simplifying all app related interactions.
-    Basically it is just a wrapper on other utility functions and shell
-    scripts.
-    """
-
-    def __init__(self, test_context, app_context, already_created=False):
-        """ Args:
-            test_context (obj: `TestContext`)
-            app_context (obj: `AppContext`)
-            already_created (bool): True if the given app has been already added
-        """
-        self.test_context = test_context
-        self.app_context = app_context
-        self.engine_path = obtain_template(
-                self.test_context.engine_directory, app_context.template)
-        self.deployed_process = None
-        if already_created:
-            self.__init_info()
-        else:
-            self.id = None
-            self.access_key = None
-            self.description = None
-
-        if self.app_context.engine_json_path:
-            self.__copy_engine_json()
-
-    def __copy_engine_json(self):
-        to_path = pjoin(self.engine_path, 'engine.json')
-        copyfile(self.app_context.engine_json_path, to_path)
-
-    def __init_info(self):
-        info = self.show()
-        self.id = info['id']
-        self.access_key = info['access_key']
-        self.description = info['description']
-
-    def new(self, id=None, description=None, access_key=None):
-        """ Creates a new application with given parameters """
-        srun('pio app new {} {} {} {}'.format(
-            '--id {}'.format(id) if id else '',
-            '--description \"{}\"'.format(description) if description else '',
-            '--access-key {}'.format(access_key) if access_key else '',
-            self.app_context.name))
-
-        self.__init_info()
-
-
-    def show(self):
-        """ Returns: application info in dictionary with the keys:
-             `name`: str, `id`: int, `description`: str,
-             `access_key`: str, `allowed_events`: str
-        """
-        output = srun_out('pio app show {}'.format(self.app_context.name)).rstrip()
-        lines = [x.split() for x in output.split('\n')]
-        return { 'name': lines[0][3],
-                 'id': int(lines[1][4]),
-                 'description': lines[2][3] if len(lines[2]) >= 4 else '',
-                 'access_key': lines[3][4],
-                 'allowed_events': lines[3][5] }
-
-
-    # deletes this app from pio
-    def delete(self):
-        srun('pio app delete {0} --force'.format(self.app_context.name))
-
-    def build(self, sbt_extra=None, clean=False, no_asm=True):
-        srun('cd {0}; pio build {1} {2} {3}'.format(
-            self.engine_path,
-            '--sbt-extra {}'.format(sbt_extra) if sbt_extra else '',
-            '--clean' if clean else '',
-            '--no-asm' if no_asm else ''))
+  """ This is a utility class simplifying all app related interactions.
+  Basically it is just a wrapper on other utility functions and shell
+  scripts.
+  """
 
-    def train(self, batch=None, skip_sanity_check=False, stop_after_read=False,
-            stop_after_prepare=False, engine_factory=None,
-            engine_params_key=None, scratch_uri=None):
-
-        srun('cd {}; pio train {} {} {} {} {} {} {}'.format(
+  def __init__(self, test_context, app_context, already_created=False):
+    """ Args:
+        test_context (obj: `TestContext`)
+        app_context (obj: `AppContext`)
+        already_created (bool): True if the given app has been already added
+    """
+    self.test_context = test_context
+    self.app_context = app_context
+    self.engine_path = obtain_template(
+        self.test_context.engine_directory, app_context.template)
+    self.deployed_process = None
+    if already_created:
+      self.__init_info()
+    else:
+      self.id = None
+      self.access_key = None
+      self.description = None
+
+    if self.app_context.engine_json_path:
+      self.__copy_engine_json()
+
+  def __copy_engine_json(self):
+    to_path = pjoin(self.engine_path, 'engine.json')
+    copyfile(self.app_context.engine_json_path, to_path)
+
+  def __init_info(self):
+    info = self.show()
+    self.id = info['id']
+    self.access_key = info['access_key']
+    self.description = info['description']
+
+  def new(self, id=None, description=None, access_key=None):
+    """ Creates a new application with given parameters """
+    srun('pio app new {} {} {} {}'.format(
+        '--id {}'.format(id) if id else '',
+        '--description \"{}\"'.format(description) if description else '',
+        '--access-key {}'.format(access_key) if access_key else '',
+        self.app_context.name))
+
+    self.__init_info()
+
+
+  def show(self):
+    """ Returns: application info in dictionary with the keys:
+         `name`: str, `id`: int, `description`: str,
+         `access_key`: str, `allowed_events`: str
+    """
+    output = srun_out('pio app show {}'.format(self.app_context.name)).rstrip()
+    lines = [x.split() for x in output.split('\n')]
+    return { 'name': lines[0][3],
+             'id': int(lines[1][4]),
+             'description': lines[2][3] if len(lines[2]) >= 4 else '',
+             'access_key': lines[3][4],
+             'allowed_events': lines[3][5] }
+
+
+  # deletes this app from pio
+  def delete(self):
+      srun('pio app delete {0} --force'.format(self.app_context.name))
+
+  def build(self, sbt_extra=None, clean=False, no_asm=True):
+    srun('cd {0}; pio build {1} {2} {3}'.format(
+        self.engine_path,
+        '--sbt-extra {}'.format(sbt_extra) if sbt_extra else '',
+        '--clean' if clean else '',
+        '--no-asm' if no_asm else ''))
+
+  def train(self, batch=None, skip_sanity_check=False, stop_after_read=False,
+          stop_after_prepare=False, engine_factory=None,
+          engine_params_key=None, scratch_uri=None):
+
+    srun('cd {}; pio train {} {} {} {} {} {} {}'.format(
+        self.engine_path,
+        '--batch {}'.format(batch) if batch else '',
+        '--skip-sanity-check' if skip_sanity_check else '',
+        '--stop-after-read' if stop_after_read else '',
+        '--stop-after-prepare' if stop_after_prepare else '',
+        '--engine_factory {}'.format(engine_factory) if engine_factory else '',
+        '--engine-params-key {}'.format(engine_params_key) if engine_params_key else '',
+        '--scratch-uri {}'.format(scratch_uri) if scratch_uri else ''))
+
+  def deploy(self, wait_time=0, ip=None, port=None, engine_instance_id=None,
+          feedback=False, accesskey=None, event_server_ip=None, event_server_port=None,
+          batch=None, scratch_uri=None):
+
+    command = 'cd {}; pio deploy {} {} {} {} {} {} {} {} {}'.format(
             self.engine_path,
-            '--batch {}'.format(batch) if batch else '',
-            '--skip-sanity-check' if skip_sanity_check else '',
-            '--stop-after-read' if stop_after_read else '',
-            '--stop-after-prepare' if stop_after_prepare else '',
-            '--engine_factory {}'.format(engine_factory) if engine_factory else '',
-            '--engine-params-key {}'.format(engine_params_key) if engine_params_key else '',
-            '--scratch-uri {}'.format(scratch_uri) if scratch_uri else ''))
-
-    def deploy(self, wait_time=0, ip=None, port=None, engine_instance_id=None,
-            feedback=False, accesskey=None, event_server_ip=None, event_server_port=None,
-            batch=None, scratch_uri=None):
-
-        command = 'cd {}; pio deploy {} {} {} {} {} {} {} {} {}'.format(
-                self.engine_path,
-                '--ip {}'.format(ip) if ip else '',
-                '--port {}'.format(port) if port else '',
-                '--engine-instance-id {}'.format(engine_instance_id) if engine_instance_id else '',
-                '--feedback' if feedback else '',
-                '--accesskey {}'.format(accesskey) if accesskey else '',
-                '--event-server-ip {}'.format(event_server_ip) if event_server_ip else '',
-                '--event-server-port {}'.format(event_server_port) if event_server_port else '',
-                '--batch {}'.format(bach) if batch else '',
-                '--scratch-uri {}'.format(scratch_uri) if scratch_uri else '')
-
-        self.deployed_process = srun_bg(command)
-        time.sleep(wait_time)
-        if self.deployed_process.poll() is not None:
-            raise Exception('Application engine terminated')
-        self.ip = ip if ip else 'localhost'
-        self.port = port if port else 8000
-
-    def stop(self):
-        """ Kills deployed engine """
-        if self.deployed_process:
-            self.deployed_process.kill()
-
-    def new_channel(self, channel):
-        srun('pio app channel-new {0}'.format(channel))
-
-    def delete_channel(self, channel):
-        srun('pio app channel-delete {0} --force'.format(channel))
-
-    def send_event(self, event):
-        return send_event(event, self.test_context, self.access_key)
-
-    def send_events_batch(self, events):
-        return send_events_batch(events, self.test_context, self.access_key)
-
-    def import_events_batch(self, events):
-        return import_events_batch(events, self.test_context, self.id)
-
-    def get_events(self, params={}):
-        return get_events(self.test_context, self.access_key, params)
-
-    def delete_data(self, delete_all=True, channel=None):
-        srun('pio app data-delete {0} {1} {2} --force'
-                .format(
-                    self.app_context.name,
-                    '--all' if delete_all else '',
-                    '--channel ' + channel if channel is not None else ''))
-
-    def query(self, data):
-        return query_engine(data, self.ip, self.port)
+            '--ip {}'.format(ip) if ip else '',
+            '--port {}'.format(port) if port else '',
+            '--engine-instance-id {}'.format(engine_instance_id) if engine_instance_id else '',
+            '--feedback' if feedback else '',
+            '--accesskey {}'.format(accesskey) if accesskey else '',
+            '--event-server-ip {}'.format(event_server_ip) if event_server_ip else '',
+            '--event-server-port {}'.format(event_server_port) if event_server_port else '',
+            '--batch {}'.format(bach) if batch else '',
+            '--scratch-uri {}'.format(scratch_uri) if scratch_uri else '')
+
+    self.deployed_process = srun_bg(command)
+    time.sleep(wait_time)
+    if self.deployed_process.poll() is not None:
+      raise Exception('Application engine terminated')
+    self.ip = ip if ip else 'localhost'
+    self.port = port if port else 8000
+
+  def stop(self):
+    """ Kills deployed engine """
+    if self.deployed_process:
+      self.deployed_process.kill()
+
+  def new_channel(self, channel):
+    srun('pio app channel-new {0}'.format(channel))
+
+  def delete_channel(self, channel):
+    srun('pio app channel-delete {0} --force'.format(channel))
+
+  def send_event(self, event):
+    return send_event(event, self.test_context, self.access_key)
+
+  def send_events_batch(self, events):
+    return send_events_batch(events, self.test_context, self.access_key)
+
+  def import_events_batch(self, events):
+    return import_events_batch(events, self.test_context, self.id)
+
+  def get_events(self, params={}):
+    return get_events(self.test_context, self.access_key, params)
+
+  def delete_data(self, delete_all=True, channel=None):
+    srun('pio app data-delete {0} {1} {2} --force'
+        .format(
+            self.app_context.name,
+            '--all' if delete_all else '',
+            '--channel ' + channel if channel is not None else ''))
+
+  def query(self, data):
+    return query_engine(data, self.ip, self.port)