You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2016/08/09 21:43:56 UTC

[48/52] [abbrv] incubator-predictionio git commit: Renamed directory testing to tests

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/5320724a/tests/pio_tests/engines/recommendation-engine/src/main/scala/ALSAlgorithm.scala
----------------------------------------------------------------------
diff --git a/tests/pio_tests/engines/recommendation-engine/src/main/scala/ALSAlgorithm.scala b/tests/pio_tests/engines/recommendation-engine/src/main/scala/ALSAlgorithm.scala
new file mode 100644
index 0000000..17c2806
--- /dev/null
+++ b/tests/pio_tests/engines/recommendation-engine/src/main/scala/ALSAlgorithm.scala
@@ -0,0 +1,138 @@
+package org.template.recommendation
+
+import org.apache.predictionio.controller.PAlgorithm
+import org.apache.predictionio.controller.Params
+import org.apache.predictionio.data.storage.BiMap
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.recommendation.ALS
+import org.apache.spark.mllib.recommendation.{Rating => MLlibRating}
+import org.apache.spark.mllib.recommendation.ALSModel
+
+import grizzled.slf4j.Logger
+
+case class ALSAlgorithmParams(
+  rank: Int,
+  numIterations: Int,
+  lambda: Double,
+  seed: Option[Long]) extends Params
+
+class ALSAlgorithm(val ap: ALSAlgorithmParams)
+  extends PAlgorithm[PreparedData, ALSModel, Query, PredictedResult] {
+
+  @transient lazy val logger = Logger[this.type]
+
+  if (ap.numIterations > 30) {
+    logger.warn(
+      s"ALSAlgorithmParams.numIterations > 30, current: ${ap.numIterations}. " +
+      s"There is a chance of running to StackOverflowException. Lower this number to remedy it")
+  }
+
+  def train(sc: SparkContext, data: PreparedData): ALSModel = {
+    // MLLib ALS cannot handle empty training data.
+    require(!data.ratings.take(1).isEmpty,
+      s"RDD[Rating] in PreparedData cannot be empty." +
+      " Please check if DataSource generates TrainingData" +
+      " and Preprator generates PreparedData correctly.")
+    // Convert user and item String IDs to Int index for MLlib
+
+    val userStringIntMap = BiMap.stringInt(data.ratings.map(_.user))
+    val itemStringIntMap = BiMap.stringInt(data.ratings.map(_.item))
+    val mllibRatings = data.ratings.map( r =>
+      // MLlibRating requires integer index for user and item
+      MLlibRating(userStringIntMap(r.user), itemStringIntMap(r.item), r.rating)
+    )
+
+    // seed for MLlib ALS
+    val seed = ap.seed.getOrElse(System.nanoTime)
+
+    // If you only have one type of implicit event (Eg. "view" event only),
+    // replace ALS.train(...) with
+    //val m = ALS.trainImplicit(
+      //ratings = mllibRatings,
+      //rank = ap.rank,
+      //iterations = ap.numIterations,
+      //lambda = ap.lambda,
+      //blocks = -1,
+      //alpha = 1.0,
+      //seed = seed)
+
+    val m = ALS.train(
+      ratings = mllibRatings,
+      rank = ap.rank,
+      iterations = ap.numIterations,
+      lambda = ap.lambda,
+      blocks = -1,
+      seed = seed)
+
+    new ALSModel(
+      rank = m.rank,
+      userFeatures = m.userFeatures,
+      productFeatures = m.productFeatures,
+      userStringIntMap = userStringIntMap,
+      itemStringIntMap = itemStringIntMap)
+  }
+
+  def predict(model: ALSModel, query: Query): PredictedResult = {
+    // Convert String ID to Int index for Mllib
+    model.userStringIntMap.get(query.user).map { userInt =>
+      // create inverse view of itemStringIntMap
+      val itemIntStringMap = model.itemStringIntMap.inverse
+      // recommendProducts() returns Array[MLlibRating], which uses item Int
+      // index. Convert it to String ID for returning PredictedResult
+      val itemScores = model.recommendProducts(userInt, query.num)
+        .map (r => ItemScore(itemIntStringMap(r.product), r.rating))
+      new PredictedResult(itemScores)
+    }.getOrElse{
+      logger.info(s"No prediction for unknown user ${query.user}.")
+      new PredictedResult(Array.empty)
+    }
+  }
+
+  // This function is used by the evaluation module, where a batch of queries is sent to this engine
+  // for evaluation purpose.
+  override def batchPredict(model: ALSModel, queries: RDD[(Long, Query)]): RDD[(Long, PredictedResult)] = {
+    val userIxQueries: RDD[(Int, (Long, Query))] = queries
+    .map { case (ix, query) => {
+      // If user not found, then the index is -1
+      val userIx = model.userStringIntMap.get(query.user).getOrElse(-1)
+      (userIx, (ix, query))
+    }}
+
+    // Cross product of all valid users from the queries and products in the model.
+    val usersProducts: RDD[(Int, Int)] = userIxQueries
+      .keys
+      .filter(_ != -1)
+      .cartesian(model.productFeatures.map(_._1))
+
+    // Call mllib ALS's predict function.
+    val ratings: RDD[MLlibRating] = model.predict(usersProducts)
+
+    // The following code construct predicted results from mllib's ratings.
+    // Not optimal implementation. Instead of groupBy, should use combineByKey with a PriorityQueue
+    val userRatings: RDD[(Int, Iterable[MLlibRating])] = ratings.groupBy(_.user)
+
+    userIxQueries.leftOuterJoin(userRatings)
+    .map {
+      // When there are ratings
+      case (userIx, ((ix, query), Some(ratings))) => {
+        val topItemScores: Array[ItemScore] = ratings
+        .toArray
+        .sortBy(_.rating)(Ordering.Double.reverse) // note: from large to small ordering
+        .take(query.num)
+        .map { rating => ItemScore(
+          model.itemStringIntMap.inverse(rating.product),
+          rating.rating) }
+
+        (ix, PredictedResult(itemScores = topItemScores))
+      }
+      // When user doesn't exist in training data
+      case (userIx, ((ix, query), None)) => {
+        require(userIx == -1)
+        (ix, PredictedResult(itemScores = Array.empty))
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/5320724a/tests/pio_tests/engines/recommendation-engine/src/main/scala/ALSModel.scala
----------------------------------------------------------------------
diff --git a/tests/pio_tests/engines/recommendation-engine/src/main/scala/ALSModel.scala b/tests/pio_tests/engines/recommendation-engine/src/main/scala/ALSModel.scala
new file mode 100644
index 0000000..243c1d1
--- /dev/null
+++ b/tests/pio_tests/engines/recommendation-engine/src/main/scala/ALSModel.scala
@@ -0,0 +1,63 @@
+package org.apache.spark.mllib.recommendation
+// This must be the same package as Spark's MatrixFactorizationModel because
+// MatrixFactorizationModel's constructor is private and we are using
+// its constructor in order to save and load the model
+
+import org.template.recommendation.ALSAlgorithmParams
+
+import org.apache.predictionio.controller.IPersistentModel
+import org.apache.predictionio.controller.IPersistentModelLoader
+import org.apache.predictionio.data.storage.BiMap
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+class ALSModel(
+    override val rank: Int,
+    override val userFeatures: RDD[(Int, Array[Double])],
+    override val productFeatures: RDD[(Int, Array[Double])],
+    val userStringIntMap: BiMap[String, Int],
+    val itemStringIntMap: BiMap[String, Int])
+  extends MatrixFactorizationModel(rank, userFeatures, productFeatures)
+  with IPersistentModel[ALSAlgorithmParams] {
+
+  def save(id: String, params: ALSAlgorithmParams,
+    sc: SparkContext): Boolean = {
+
+    sc.parallelize(Seq(rank)).saveAsObjectFile(s"/tmp/${id}/rank")
+    userFeatures.saveAsObjectFile(s"/tmp/${id}/userFeatures")
+    productFeatures.saveAsObjectFile(s"/tmp/${id}/productFeatures")
+    sc.parallelize(Seq(userStringIntMap))
+      .saveAsObjectFile(s"/tmp/${id}/userStringIntMap")
+    sc.parallelize(Seq(itemStringIntMap))
+      .saveAsObjectFile(s"/tmp/${id}/itemStringIntMap")
+    true
+  }
+
+  override def toString = {
+    s"userFeatures: [${userFeatures.count()}]" +
+    s"(${userFeatures.take(2).toList}...)" +
+    s" productFeatures: [${productFeatures.count()}]" +
+    s"(${productFeatures.take(2).toList}...)" +
+    s" userStringIntMap: [${userStringIntMap.size}]" +
+    s"(${userStringIntMap.take(2)}...)" +
+    s" itemStringIntMap: [${itemStringIntMap.size}]" +
+    s"(${itemStringIntMap.take(2)}...)"
+  }
+}
+
+object ALSModel
+  extends IPersistentModelLoader[ALSAlgorithmParams, ALSModel] {
+  def apply(id: String, params: ALSAlgorithmParams,
+    sc: Option[SparkContext]) = {
+    new ALSModel(
+      rank = sc.get.objectFile[Int](s"/tmp/${id}/rank").first,
+      userFeatures = sc.get.objectFile(s"/tmp/${id}/userFeatures"),
+      productFeatures = sc.get.objectFile(s"/tmp/${id}/productFeatures"),
+      userStringIntMap = sc.get
+        .objectFile[BiMap[String, Int]](s"/tmp/${id}/userStringIntMap").first,
+      itemStringIntMap = sc.get
+        .objectFile[BiMap[String, Int]](s"/tmp/${id}/itemStringIntMap").first)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/5320724a/tests/pio_tests/engines/recommendation-engine/src/main/scala/DataSource.scala
----------------------------------------------------------------------
diff --git a/tests/pio_tests/engines/recommendation-engine/src/main/scala/DataSource.scala b/tests/pio_tests/engines/recommendation-engine/src/main/scala/DataSource.scala
new file mode 100644
index 0000000..eea3ae6
--- /dev/null
+++ b/tests/pio_tests/engines/recommendation-engine/src/main/scala/DataSource.scala
@@ -0,0 +1,103 @@
+package org.template.recommendation
+
+import org.apache.predictionio.controller.PDataSource
+import org.apache.predictionio.controller.EmptyEvaluationInfo
+import org.apache.predictionio.controller.EmptyActualResult
+import org.apache.predictionio.controller.Params
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.store.PEventStore
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+import grizzled.slf4j.Logger
+
+case class DataSourceEvalParams(kFold: Int, queryNum: Int)
+
+case class DataSourceParams(
+  appName: String,
+  evalParams: Option[DataSourceEvalParams]) extends Params
+
+class DataSource(val dsp: DataSourceParams)
+  extends PDataSource[TrainingData,
+      EmptyEvaluationInfo, Query, ActualResult] {
+
+  @transient lazy val logger = Logger[this.type]
+
+  def getRatings(sc: SparkContext): RDD[Rating] = {
+
+    val eventsRDD: RDD[Event] = PEventStore.find(
+      appName = dsp.appName,
+      entityType = Some("user"),
+      eventNames = Some(List("rate", "buy")), // read "rate" and "buy" event
+      // targetEntityType is optional field of an event.
+      targetEntityType = Some(Some("item")))(sc)
+
+    val ratingsRDD: RDD[Rating] = eventsRDD.map { event =>
+      val rating = try {
+        val ratingValue: Double = event.event match {
+          case "rate" => event.properties.get[Double]("rating")
+          case "buy" => 4.0 // map buy event to rating value of 4
+          case _ => throw new Exception(s"Unexpected event ${event} is read.")
+        }
+        // entityId and targetEntityId is String
+        Rating(event.entityId,
+          event.targetEntityId.get,
+          ratingValue)
+      } catch {
+        case e: Exception => {
+          logger.error(s"Cannot convert ${event} to Rating. Exception: ${e}.")
+          throw e
+        }
+      }
+      rating
+    }.cache()
+
+    ratingsRDD
+  }
+
+  override
+  def readTraining(sc: SparkContext): TrainingData = {
+    new TrainingData(getRatings(sc))
+  }
+
+  override
+  def readEval(sc: SparkContext)
+  : Seq[(TrainingData, EmptyEvaluationInfo, RDD[(Query, ActualResult)])] = {
+    require(!dsp.evalParams.isEmpty, "Must specify evalParams")
+    val evalParams = dsp.evalParams.get
+
+    val kFold = evalParams.kFold
+    val ratings: RDD[(Rating, Long)] = getRatings(sc).zipWithUniqueId
+    ratings.cache
+
+    (0 until kFold).map { idx => {
+      val trainingRatings = ratings.filter(_._2 % kFold != idx).map(_._1)
+      val testingRatings = ratings.filter(_._2 % kFold == idx).map(_._1)
+
+      val testingUsers: RDD[(String, Iterable[Rating])] = testingRatings.groupBy(_.user)
+
+      (new TrainingData(trainingRatings),
+        new EmptyEvaluationInfo(),
+        testingUsers.map {
+          case (user, ratings) => (Query(user, evalParams.queryNum), ActualResult(ratings.toArray))
+        }
+      )
+    }}
+  }
+}
+
+case class Rating(
+  user: String,
+  item: String,
+  rating: Double
+)
+
+class TrainingData(
+  val ratings: RDD[Rating]
+) extends Serializable {
+  override def toString = {
+    s"ratings: [${ratings.count()}] (${ratings.take(2).toList}...)"
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/5320724a/tests/pio_tests/engines/recommendation-engine/src/main/scala/Engine.scala
----------------------------------------------------------------------
diff --git a/tests/pio_tests/engines/recommendation-engine/src/main/scala/Engine.scala b/tests/pio_tests/engines/recommendation-engine/src/main/scala/Engine.scala
new file mode 100644
index 0000000..79840dc
--- /dev/null
+++ b/tests/pio_tests/engines/recommendation-engine/src/main/scala/Engine.scala
@@ -0,0 +1,32 @@
+package org.template.recommendation
+
+import org.apache.predictionio.controller.IEngineFactory
+import org.apache.predictionio.controller.Engine
+
+case class Query(
+  user: String,
+  num: Int
+) extends Serializable
+
+case class PredictedResult(
+  itemScores: Array[ItemScore]
+) extends Serializable
+
+case class ActualResult(
+  ratings: Array[Rating]
+) extends Serializable
+
+case class ItemScore(
+  item: String,
+  score: Double
+) extends Serializable
+
+object RecommendationEngine extends IEngineFactory {
+  def apply() = {
+    new Engine(
+      classOf[DataSource],
+      classOf[Preparator],
+      Map("als" -> classOf[ALSAlgorithm]),
+      classOf[Serving])
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/5320724a/tests/pio_tests/engines/recommendation-engine/src/main/scala/Evaluation.scala
----------------------------------------------------------------------
diff --git a/tests/pio_tests/engines/recommendation-engine/src/main/scala/Evaluation.scala b/tests/pio_tests/engines/recommendation-engine/src/main/scala/Evaluation.scala
new file mode 100644
index 0000000..34e5689
--- /dev/null
+++ b/tests/pio_tests/engines/recommendation-engine/src/main/scala/Evaluation.scala
@@ -0,0 +1,89 @@
+package org.template.recommendation
+
+import org.apache.predictionio.controller.Evaluation
+import org.apache.predictionio.controller.OptionAverageMetric
+import org.apache.predictionio.controller.AverageMetric
+import org.apache.predictionio.controller.EmptyEvaluationInfo
+import org.apache.predictionio.controller.EngineParamsGenerator
+import org.apache.predictionio.controller.EngineParams
+import org.apache.predictionio.controller.MetricEvaluator
+
+// Usage:
+// $ pio eval org.template.recommendation.RecommendationEvaluation \
+//   org.template.recommendation.EngineParamsList
+
+case class PrecisionAtK(k: Int, ratingThreshold: Double = 2.0)
+    extends OptionAverageMetric[EmptyEvaluationInfo, Query, PredictedResult, ActualResult] {
+  require(k > 0, "k must be greater than 0")
+
+  override def header = s"Precision@K (k=$k, threshold=$ratingThreshold)"
+
+  def calculate(q: Query, p: PredictedResult, a: ActualResult): Option[Double] = {
+    val positives: Set[String] = a.ratings.filter(_.rating >= ratingThreshold).map(_.item).toSet
+
+    // If there is no positive results, Precision is undefined. We don't consider this case in the
+    // metrics, hence we return None.
+    if (positives.size == 0) {
+      return None
+    }
+
+    val tpCount: Int = p.itemScores.take(k).filter(is => positives(is.item)).size
+
+    Some(tpCount.toDouble / math.min(k, positives.size))
+  }
+}
+
+case class PositiveCount(ratingThreshold: Double = 2.0)
+    extends AverageMetric[EmptyEvaluationInfo, Query, PredictedResult, ActualResult] {
+  override def header = s"PositiveCount (threshold=$ratingThreshold)"
+
+  def calculate(q: Query, p: PredictedResult, a: ActualResult): Double = {
+    a.ratings.filter(_.rating >= ratingThreshold).size
+  }
+}
+
+object RecommendationEvaluation extends Evaluation {
+  engineEvaluator = (
+    RecommendationEngine(),
+    MetricEvaluator(
+      metric = PrecisionAtK(k = 10, ratingThreshold = 4.0),
+      otherMetrics = Seq(
+        PositiveCount(ratingThreshold = 4.0),
+        PrecisionAtK(k = 10, ratingThreshold = 2.0),
+        PositiveCount(ratingThreshold = 2.0),
+        PrecisionAtK(k = 10, ratingThreshold = 1.0),
+        PositiveCount(ratingThreshold = 1.0)
+      )))
+}
+
+
+object ComprehensiveRecommendationEvaluation extends Evaluation {
+  val ratingThresholds = Seq(0.0, 2.0, 4.0)
+  val ks = Seq(1, 3, 10)
+
+  engineEvaluator = (
+    RecommendationEngine(),
+    MetricEvaluator(
+      metric = PrecisionAtK(k = 3, ratingThreshold = 2.0),
+      otherMetrics = (
+        (for (r <- ratingThresholds) yield PositiveCount(ratingThreshold = r)) ++
+        (for (r <- ratingThresholds; k <- ks) yield PrecisionAtK(k = k, ratingThreshold = r))
+      )))
+}
+
+
+trait BaseEngineParamsList extends EngineParamsGenerator {
+  protected val baseEP = EngineParams(
+    dataSourceParams = DataSourceParams(
+      appName = "INVALID_APP_NAME",
+      evalParams = Some(DataSourceEvalParams(kFold = 5, queryNum = 10))))
+}
+
+object EngineParamsList extends BaseEngineParamsList {
+  engineParamsList = for(
+    rank <- Seq(5, 10, 20);
+    numIterations <- Seq(1, 5, 10))
+    yield baseEP.copy(
+      algorithmParamsList = Seq(
+        ("als", ALSAlgorithmParams(rank, numIterations, 0.01, Some(3)))))
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/5320724a/tests/pio_tests/engines/recommendation-engine/src/main/scala/Preparator.scala
----------------------------------------------------------------------
diff --git a/tests/pio_tests/engines/recommendation-engine/src/main/scala/Preparator.scala b/tests/pio_tests/engines/recommendation-engine/src/main/scala/Preparator.scala
new file mode 100644
index 0000000..8f2f7e4
--- /dev/null
+++ b/tests/pio_tests/engines/recommendation-engine/src/main/scala/Preparator.scala
@@ -0,0 +1,19 @@
+package org.template.recommendation
+
+import org.apache.predictionio.controller.PPreparator
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+class Preparator
+  extends PPreparator[TrainingData, PreparedData] {
+
+  def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = {
+    new PreparedData(ratings = trainingData.ratings)
+  }
+}
+
+class PreparedData(
+  val ratings: RDD[Rating]
+) extends Serializable

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/5320724a/tests/pio_tests/engines/recommendation-engine/src/main/scala/Serving.scala
----------------------------------------------------------------------
diff --git a/tests/pio_tests/engines/recommendation-engine/src/main/scala/Serving.scala b/tests/pio_tests/engines/recommendation-engine/src/main/scala/Serving.scala
new file mode 100644
index 0000000..38ba8b9
--- /dev/null
+++ b/tests/pio_tests/engines/recommendation-engine/src/main/scala/Serving.scala
@@ -0,0 +1,13 @@
+package org.template.recommendation
+
+import org.apache.predictionio.controller.LServing
+
+class Serving
+  extends LServing[Query, PredictedResult] {
+
+  override
+  def serve(query: Query,
+    predictedResults: Seq[PredictedResult]): PredictedResult = {
+    predictedResults.head
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/5320724a/tests/pio_tests/engines/recommendation-engine/template.json
----------------------------------------------------------------------
diff --git a/tests/pio_tests/engines/recommendation-engine/template.json b/tests/pio_tests/engines/recommendation-engine/template.json
new file mode 100644
index 0000000..fb4a50b
--- /dev/null
+++ b/tests/pio_tests/engines/recommendation-engine/template.json
@@ -0,0 +1 @@
+{"pio": {"version": { "min": "0.9.2" }}}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/5320724a/tests/pio_tests/globals.py
----------------------------------------------------------------------
diff --git a/tests/pio_tests/globals.py b/tests/pio_tests/globals.py
new file mode 100644
index 0000000..1134501
--- /dev/null
+++ b/tests/pio_tests/globals.py
@@ -0,0 +1,17 @@
+import subprocess
+
+SUPPRESS_STDOUT=False
+SUPPRESS_STDERR=False
+LOGGER_NAME='INT_TESTS'
+
+def std_out():
+  if SUPPRESS_STDOUT:
+    return subprocess.DEVNULL
+  else:
+    return None
+
+def std_err():
+  if SUPPRESS_STDERR:
+    return subprocess.DEVNULL
+  else:
+    return None

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/5320724a/tests/pio_tests/integration.py
----------------------------------------------------------------------
diff --git a/tests/pio_tests/integration.py b/tests/pio_tests/integration.py
new file mode 100644
index 0000000..441365e
--- /dev/null
+++ b/tests/pio_tests/integration.py
@@ -0,0 +1,46 @@
+import unittest
+import logging
+import pio_tests.globals as globals
+
+class TestContext:
+  """Class representing the settings provided for every test"""
+
+  def __init__(self, engine_directory, data_directory, es_ip='0.0.0.0', es_port=7070):
+    """
+    Args:
+      engine_directory (str): path to the directory where the engines are stored
+      data_directory (str):   path to the directory where tests can keep their data
+      es_ip (str):            ip of the eventserver
+      es_port (int):          port of the eventserver
+    """
+    self.engine_directory = engine_directory
+    self.data_directory = data_directory
+    self.es_ip = es_ip
+    self.es_port = es_port
+
+class BaseTestCase(unittest.TestCase):
+  """This is the base class for all integration tests
+
+  This class sets up a `TestContext` object and a logger for every test case
+  """
+  def __init__(self, test_context, methodName='runTest'):
+    super(BaseTestCase, self).__init__(methodName)
+    self.test_context = test_context
+    self.log = logging.getLogger(globals.LOGGER_NAME)
+
+class AppContext:
+  """ This class is a description of an instance of the engine"""
+
+  def __init__(self, name, template, engine_json_path=None):
+    """
+    Args:
+      name (str): application name
+      template (str): either the name of an engine from the engines directory
+          or a link to repository with the engine
+      engine_json_path (str): path to json file describing an engine (a custom engine.json)
+          to be used for the application. If `None`, engine.json from the engine's directory
+          will be used
+    """
+    self.name = name
+    self.template = template
+    self.engine_json_path = engine_json_path

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/5320724a/tests/pio_tests/scenarios/__init__.py
----------------------------------------------------------------------
diff --git a/tests/pio_tests/scenarios/__init__.py b/tests/pio_tests/scenarios/__init__.py
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/5320724a/tests/pio_tests/scenarios/basic_app_usecases.py
----------------------------------------------------------------------
diff --git a/tests/pio_tests/scenarios/basic_app_usecases.py b/tests/pio_tests/scenarios/basic_app_usecases.py
new file mode 100644
index 0000000..d8b3a1e
--- /dev/null
+++ b/tests/pio_tests/scenarios/basic_app_usecases.py
@@ -0,0 +1,154 @@
+import os
+import unittest
+import random
+import logging
+import time
+from subprocess import CalledProcessError
+from pio_tests.integration import BaseTestCase, AppContext
+from utils import *
+
+ITEMS_COUNT = 12
+
+def get_buy_events(users, per_user=2):
+  events = []
+  for u in range(users):
+    items = set([random.randint(0, ITEMS_COUNT) for i in range(per_user)])
+    for item in items:
+      events.append({
+        "event": "buy",
+        "entityType": "user",
+        "entityId": u,
+        "targetEntityType": "item",
+        "targetEntityId": item })
+
+  return events
+
+def get_rate_events(users, per_user=2):
+  events = []
+  for u in range(users):
+    items = set([random.randint(0, ITEMS_COUNT) for i in range(per_user)])
+    for item in items:
+      events.append( {
+        "event": "rate",
+        "entityType": "user",
+        "entityId": u,
+        "targetEntityType": "item",
+        "targetEntityId": item,
+        "properties": { "rating" : float(random.randint(1,5)) } })
+
+  return events
+
+
+class BasicAppUsecases(BaseTestCase):
+
+  def setUp(self):
+    random.seed(3)
+    self.log.info("Setting up the engine")
+
+    template_path = pjoin(
+        self.test_context.engine_directory, "recommendation-engine")
+    engine_json_path = pjoin(
+        self.test_context.data_directory, "quickstart_test/engine.json")
+
+    app_context = AppContext(
+        name="MyRecommender",
+        template=template_path,
+        engine_json_path=engine_json_path)
+
+    self.app = AppEngine(self.test_context, app_context)
+
+  def runTest(self):
+    self.app_creation()
+    self.check_app_list()
+    self.check_data()
+    self.check_build()
+    self.check_train_and_deploy()
+
+  def app_creation(self):
+    self.log.info("Adding a new application")
+    description = "SomeDescription"
+    self.app.new(description=description)
+    self.assertEqual(description, self.app.description)
+
+    self.log.info("Creating an app again - should fail")
+    self.assertRaises(CalledProcessError, lambda : self.app.new())
+
+  def check_app_list(self):
+    self.log.info("Checking if app is on the list")
+    apps = pio_app_list()
+    self.assertEqual(1,
+        len([a for a in apps if a['name'] == self.app.app_context.name]))
+
+  def check_data(self):
+    self.log.info("Importing events")
+    buy_events = get_buy_events(20, 1)
+    rate_events = get_rate_events(20, 1)
+
+    for ev in buy_events + rate_events:
+      self.assertEquals(201, self.app.send_event(ev).status_code)
+
+    self.log.info("Checking imported events")
+    r = self.app.get_events(params={'limit': -1})
+    self.assertEqual(200, r.status_code)
+    self.assertEqual(len(buy_events) + len(rate_events), len(r.json()))
+
+    self.log.info("Deleting entire data")
+    self.app.delete_data()
+    self.log.info("Checking if there are no events at all")
+    r = self.app.get_events(params={'limit': -1})
+    self.assertEqual(404, r.status_code)
+
+  def check_build(self):
+    self.log.info("Clean build")
+    self.app.build(clean=True)
+    self.log.info("Second build")
+    self.app.build()
+
+  def check_train_and_deploy(self):
+    self.log.info("import some data first")
+    buy_events = get_buy_events(20, 5)
+    rate_events = get_rate_events(20, 5)
+    for ev in buy_events + rate_events:
+      self.assertEquals(201, self.app.send_event(ev).status_code)
+
+    self.log.info("Training")
+    self.app.train()
+    self.log.info("Deploying")
+    self.app.deploy()
+    self.assertFalse(self.app.deployed_process.poll())
+
+    self.log.info("Importing more events")
+    buy_events = get_buy_events(60, 5)
+    rate_events = get_rate_events(60, 5)
+    for ev in buy_events + rate_events:
+      self.assertEquals(201, self.app.send_event(ev).status_code)
+
+    self.log.info("Training again")
+    self.app.train()
+
+    time.sleep(7)
+
+    self.log.info("Check serving")
+    r = self.app.query({"user": 1, "num": 5})
+    self.assertEqual(200, r.status_code)
+    result = r.json()
+    self.assertEqual(5, len(result['itemScores']))
+    r = self.app.query({"user": 5, "num": 3})
+    self.assertEqual(200, r.status_code)
+    result = r.json()
+    self.assertEqual(3, len(result['itemScores']))
+
+    self.log.info("Remove data")
+    self.app.delete_data()
+    self.log.info("Retraining should fail")
+    self.assertRaises(CalledProcessError, lambda: self.app.train())
+
+
+  def tearDown(self):
+    self.log.info("Stopping deployed engine")
+    self.app.stop()
+    self.log.info("Deleting all related data")
+    self.app.delete_data()
+    self.log.info("Removing an app")
+    self.app.delete()
+

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/5320724a/tests/pio_tests/scenarios/eventserver_test.py
----------------------------------------------------------------------
diff --git a/tests/pio_tests/scenarios/eventserver_test.py b/tests/pio_tests/scenarios/eventserver_test.py
new file mode 100644
index 0000000..8c243d2
--- /dev/null
+++ b/tests/pio_tests/scenarios/eventserver_test.py
@@ -0,0 +1,155 @@
+import unittest
+import requests
+import json
+import argparse
+from subprocess import Popen
+from utils import AppEngine, pjoin
+from pio_tests.integration import BaseTestCase, AppContext
+
+class EventserverTest(BaseTestCase):
+  """ Integration test for PredictionIO Eventserver API
+  Refer to below for further information:
+    http://docs.prediction.io/datacollection/eventmodel/
+    http://docs.prediction.io/datacollection/eventapi/
+  """
+  # Helper methods
+  def eventserver_url(self, path=None):
+    url = 'http://{}:{}'.format(
+            self.test_context.es_ip, self.test_context.es_port)
+    if path: url += '/{}'.format(path)
+    return url
+
+  def load_events(self, json_file):
+    file_path = pjoin(self.test_context.data_directory,
+        'eventserver_test/{}'.format(json_file))
+    return json.loads(open(file_path).read())
+
+
+  def setUp(self):
+    template_path = pjoin(
+        self.test_context.engine_directory, "recommendation-engine")
+    app_context = AppContext(
+        name="MyRecommender",
+        template=template_path)
+    self.app = AppEngine(self.test_context, app_context)
+
+  def runTest(self):
+    self.log.info("Check if Eventserver is alive and running")
+    r = requests.get(self.eventserver_url())
+    self.assertDictEqual(r.json(), {"status": "alive"})
+
+    self.log.info("Cannot view events with empty accessKey")
+    r = requests.get(self.eventserver_url(path='events.json'))
+    self.assertDictEqual(r.json(), {"message": "Missing accessKey."})
+
+    self.log.info("Cannot view events with invalid accessKey")
+    r = requests.get(self.eventserver_url(path='events.json'),
+        params={'accessKey': ''})
+    self.assertDictEqual(r.json(), {"message": "Invalid accessKey."})
+
+    self.log.info("Adding new pio application")
+    self.app.new()
+
+    self.log.info("No events have been sent yet")
+    r = self.app.get_events()
+    self.assertDictEqual(r.json(), {"message": "Not Found"})
+
+    # Testing POST
+    self.log.info("Sending single event")
+    event1 = {
+      'event' : 'test',
+      'entityType' : 'test',
+      'entityId' : 't1'
+    }
+    r = self.app.send_event(event1)
+    self.assertEqual(201, r.status_code)
+
+    self.log.info("Sending batch of events")
+    r = self.app.send_events_batch(
+        self.load_events("rate_events_25.json"))
+    self.assertEqual(200, r.status_code)
+
+    self.log.info("Cannot send more than 50 events per batch")
+    r = self.app.send_events_batch(
+        self.load_events("signup_events_51.json"))
+    self.assertEqual(400, r.status_code)
+
+    self.log.info("Importing events from file does not have batch size limit")
+    self.app.import_events_batch(
+        self.load_events("signup_events_51.json"))
+
+    self.log.info("Individual events may fail when sending events as batch")
+    r = self.app.send_events_batch(
+        self.load_events("partially_malformed_events.json"))
+    self.assertEqual(200, r.status_code)
+    self.assertEqual(201, r.json()[0]['status'])
+    self.assertEqual(400, r.json()[1]['status'])
+
+    # Testing GET for different parameters
+    params = {'event': 'rate'}
+    r = self.app.get_events(params=params)
+    self.assertEqual(20, len(r.json()))
+    self.assertEqual('rate', r.json()[0]['event'])
+
+    params = {
+      'event': 'rate',
+      'limit': -1 }
+    r = self.app.get_events(params=params)
+    self.assertEqual(25, len(r.json()))
+    self.assertEqual('rate', r.json()[0]['event'])
+
+    params = {
+      'event': 'rate',
+      'limit': 10 }
+    r = self.app.get_events(params=params)
+    self.assertEqual(10, len(r.json()))
+    self.assertEqual('rate', r.json()[0]['event'])
+
+    params = {
+      'event': 'rate',
+      'entityType': 'user',
+      'entityId': '1' }
+    r = self.app.get_events(params=params)
+    self.assertEqual(5, len(r.json()))
+    self.assertEqual('1', r.json()[0]['entityId'])
+
+    params = {
+      'event': 'rate',
+      'targetEntityType': 'item',
+      'targetEntityId': '1' }
+    r = self.app.get_events(params=params)
+    self.assertEqual(5, len(r.json()))
+    self.assertEqual('1', r.json()[0]['targetEntityId'])
+
+    params = {
+      'event': 'rate',
+      'entityType': 'user',
+      'entityId': '1',
+      'startTime': '2014-11-01T09:39:45.618-08:00',
+      'untilTime': '2014-11-04T09:39:45.618-08:00' }
+    r = self.app.get_events(params=params)
+    self.assertEqual(3, len(r.json()))
+    self.assertEqual('1', r.json()[0]['entityId'])
+
+    params = {
+      'event': 'rate',
+      'entityType': 'user',
+      'entityId': '1',
+      'reversed': 'true' }
+    r = self.app.get_events(params=params)
+    self.assertEqual(5, len(r.json()))
+    self.assertEqual('2014-11-05T09:39:45.618-08:00', r.json()[0]['eventTime'])
+
+  def tearDown(self):
+    self.log.info("Deleting all app data")
+    self.app.delete_data()
+    self.log.info("Deleting app")
+    self.app.delete()
+
+
+if __name__ == '__main__':
+  suite = unittest.TestSuite([BasicEventserverTest])
+  result = unittest.TextTestRunner(verbosity=2).run(suite)
+  if not result.wasSuccessful():
+    sys.exit(1)
+

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/5320724a/tests/pio_tests/scenarios/quickstart_test.py
----------------------------------------------------------------------
diff --git a/tests/pio_tests/scenarios/quickstart_test.py b/tests/pio_tests/scenarios/quickstart_test.py
new file mode 100644
index 0000000..a083c2b
--- /dev/null
+++ b/tests/pio_tests/scenarios/quickstart_test.py
@@ -0,0 +1,125 @@
+import os
+import unittest
+import random
+import logging
+from pio_tests.integration import BaseTestCase, AppContext
+from utils import AppEngine, srun, pjoin
+
+def read_events(file_path):
+  RATE_ACTIONS_DELIMITER = "::"
+  with open(file_path, 'r') as f:
+    events = []
+    for line in f:
+      data = line.rstrip('\r\n').split(RATE_ACTIONS_DELIMITER)
+      if random.randint(0, 1) == 1:
+        events.append( {
+          "event": "rate",
+          "entityType": "user",
+          "entityId": data[0],
+          "targetEntityType": "item",
+          "targetEntityId": data[1],
+          "properties": { "rating" : float(data[2]) } })
+      else:
+        events.append({
+          "event": "buy",
+          "entityType": "user",
+          "entityId": data[0],
+          "targetEntityType": "item",
+          "targetEntityId": data[1] })
+
+    return events
+
+
+class QuickStartTest(BaseTestCase):
+
+  def setUp(self):
+    self.log.info("Setting up the engine")
+
+    template_path = pjoin(
+        self.test_context.engine_directory, "recommendation-engine")
+    engine_json_path = pjoin(
+        self.test_context.data_directory, "quickstart_test/engine.json")
+
+    self.training_data_path = pjoin(
+        self.test_context.data_directory,
+        "quickstart_test/training_data.txt")
+
+    # downloading training data
+    srun('curl https://raw.githubusercontent.com/apache/spark/master/' \
+            'data/mllib/sample_movielens_data.txt --create-dirs -o {}'
+            .format(self.training_data_path))
+
+    app_context = AppContext(
+        name="MyRecommender",
+        template=template_path,
+        engine_json_path=engine_json_path)
+
+    self.app = AppEngine(self.test_context, app_context)
+
+  def runTest(self):
+    self.log.info("Adding a new application")
+    self.app.new()
+
+    event1 = {
+      "event" : "rate",
+      "entityType" : "user",
+      "entityId" : "u0",
+      "targetEntityType" : "item",
+      "targetEntityId" : "i0",
+      "properties" : {
+        "rating" : 5
+      },
+      "eventTime" : "2014-11-02T09:39:45.618-08:00" }
+
+    event2 = {
+      "event" : "buy",
+      "entityType" : "user",
+      "entityId" : "u1",
+      "targetEntityType" : "item",
+      "targetEntityId" : "i2",
+      "eventTime" : "2014-11-10T12:34:56.123-08:00" }
+
+    self.log.info("Sending two test events")
+    self.assertListEqual(
+        [201, 201],
+        [self.app.send_event(e).status_code for e in [event1, event2]])
+
+    self.log.info("Checking the number of events stored on the server")
+    r = self.app.get_events()
+    self.assertEquals(200, r.status_code)
+    stored_events = r.json()
+    self.assertEqual(2, len(stored_events))
+
+    self.log.info("Importing many events")
+    new_events = read_events(self.training_data_path)
+    for ev in new_events:
+      r = self.app.send_event(ev)
+      self.assertEqual(201, r.status_code)
+
+    self.log.info("Checking the number of events stored on the server after the update")
+    r = self.app.get_events(params={'limit': -1})
+    self.assertEquals(200, r.status_code)
+    stored_events = r.json()
+    self.assertEquals(len(new_events) + 2, len(stored_events))
+
+    self.log.info("Building an engine...")
+    self.app.build()
+    self.log.info("Training...")
+    self.app.train()
+    self.log.info("Deploying and waiting 15s for it to start...")
+    self.app.deploy(wait_time=15)
+
+    self.log.info("Sending a single query and checking results")
+    user_query = { "user": 1, "num": 4 }
+    r = self.app.query(user_query)
+    self.assertEqual(200, r.status_code)
+    result = r.json()
+    self.assertEqual(4, len(result['itemScores']))
+
+  def tearDown(self):
+    self.log.info("Stopping deployed engine")
+    self.app.stop()
+    self.log.info("Deleting all related data")
+    self.app.delete_data()
+    self.log.info("Removing an app")
+    self.app.delete()

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/5320724a/tests/pio_tests/tests.py
----------------------------------------------------------------------
diff --git a/tests/pio_tests/tests.py b/tests/pio_tests/tests.py
new file mode 100755
index 0000000..33d9940
--- /dev/null
+++ b/tests/pio_tests/tests.py
@@ -0,0 +1,80 @@
+import os
+import sys
+import unittest
+import argparse
+import logging
+import time
+from xmlrunner import XMLTestRunner
+import pio_tests.globals as globals
+from utils import srun_bg
+from pio_tests.integration import TestContext
+from pio_tests.scenarios.quickstart_test import QuickStartTest
+from pio_tests.scenarios.basic_app_usecases import BasicAppUsecases
+from pio_tests.scenarios.eventserver_test import EventserverTest
+
+parser = argparse.ArgumentParser(description='Integration tests for PredictionIO')
+parser.add_argument('--eventserver-ip', default='0.0.0.0')
+parser.add_argument('--eventserver-port', type=int, default=7070)
+parser.add_argument('--no-shell-stdout', action='store_true',
+    help='Suppress STDOUT output from shell executed commands')
+parser.add_argument('--no-shell-stderr', action='store_true',
+    help='Suppress STDERR output from shell executed commands')
+parser.add_argument('--logging', action='store', choices=['INFO', 'DEBUG', 'NO_LOGGING'],
+    default='INFO', help='Choose the logging level')
+parser.add_argument('--tests', nargs='*', type=str,
+    default=None, help='Names of the tests to execute. By default all tests will be checked')
+
+TESTS_DIRECTORY = os.path.abspath(os.path.dirname(__file__))
+ENGINE_DIRECTORY = os.path.join(TESTS_DIRECTORY, "engines")
+DATA_DIRECTORY = os.path.join(TESTS_DIRECTORY, "data")
+
+LOGGING_FORMAT = '[%(levelname)s] %(module)s %(asctime)-15s: %(message)s'
+logging.basicConfig(format=LOGGING_FORMAT)
+
+def get_tests(test_context):
+  # ========= ADD TESTS HERE!!! ================================
+  return {'QuickStart': QuickStartTest(test_context),
+          'BasicAppUsecases': BasicAppUsecases(test_context),
+          'EventserverTest': EventserverTest(test_context)}
+
+if __name__ == "__main__":
+  args = vars(parser.parse_args())
+
+  if args.get('no_shell_stdout'):
+    globals.SUPPRESS_STDOUT = True
+  if args.get('no_shell_stderr'):
+    globals.SUPPRESS_STDERR = True
+
+  # setting up logging
+  log_opt = args['logging']
+  logger = logging.getLogger(globals.LOGGER_NAME)
+  if log_opt == 'INFO':
+    logger.level = logging.INFO
+  elif log_opt == 'DEBUG':
+    logger.level = logging.DEBUG
+
+  test_context = TestContext(
+      ENGINE_DIRECTORY, DATA_DIRECTORY,
+      args['eventserver_ip'], int(args['eventserver_port']))
+
+  tests_dict = get_tests(test_context)
+  test_names = args['tests']
+  tests = []
+  if test_names is not None:
+    tests = [t for name, t in tests_dict.items() if name in test_names]
+  else:
+    tests = tests_dict.values()
+
+  # Actual tests execution
+  es_wait_time = 25
+  logger.info("Starting eventserver and waiting {}s for it to initialize".format(
+      es_wait_time))
+  event_server_process = srun_bg('pio eventserver --ip {} --port {}'
+      .format(test_context.es_ip, test_context.es_port))
+  time.sleep(es_wait_time)
+  result = XMLTestRunner(verbosity=2, output='test-reports').run(
+                unittest.TestSuite(tests))
+  event_server_process.kill()
+
+  if not result.wasSuccessful():
+    sys.exit(1)

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/5320724a/tests/pio_tests/utils.py
----------------------------------------------------------------------
diff --git a/tests/pio_tests/utils.py b/tests/pio_tests/utils.py
new file mode 100644
index 0000000..629729e
--- /dev/null
+++ b/tests/pio_tests/utils.py
@@ -0,0 +1,309 @@
+import re
+import time
+import os
+import requests
+import json
+from shutil import copyfile
+from subprocess import run, Popen, check_output
+from os.path import join as pjoin
+import pio_tests.globals as globals
+
+def srun(command):
+  """ Runs a shell command given as a `str`
+  Raises: `subprocess.CalledProcessError` when exit code != 0
+  """
+  return run(command, shell=True, stdout=globals.std_out(),
+      stderr=globals.std_err(), check=True)
+
+def srun_out(command):
+  """ Runs a shell command given as a `str`
+  Returns: string with command's output
+  Raises: `subprocess.CalledProcessError` when exit code != 0
+  """
+  return check_output(command, shell=True, universal_newlines=True,
+      stderr=globals.std_err())
+
+def srun_bg(command):
+  """ Runs a shell command given as a `str` in the background
+  Returns: (obj: `subprocess.Popen`) for executed process
+  """
+  return Popen(command, shell=True, stdout=globals.std_out(),
+      stderr=globals.std_err())
+
+def repository_dirname(template):
+  """ Utility function getting repository name from the link
+  Example: for "https://github.com/user/SomeRepo" should return "SomeRepo"
+  """
+  return template.split('/')[-1]
+
+def obtain_template(engine_dir, template):
+  """Given a directory with engines and a template downloads an engine
+  if neccessary
+  Args:
+    engine_dir (str): directory where engines are stored
+    template (str): either the name of an engine from the engines directory
+        or a link to repository with the engine
+  Returns: str with the engine's path
+  """
+  if re.match('^https?:\/\/', template):
+    dest_dir = pjoin(engine_dir, repository_dirname(template))
+    if not os.path.exists(dest_dir):
+      srun('git clone --depth=1 {0} {1}'.format(template, dest_dir))
+    return dest_dir
+  else:
+    # check if exists
+    dest_dir = pjoin(engine_dir, template)
+    if not os.path.exists(dest_dir):
+      raise ValueError('Engine {0} does not exist in {1}'
+          .format(template, engine_dir))
+
+    return dest_dir
+
+def pio_app_list():
+  """Returns: a list of dicts for every application with the following keys:
+      `name`, `id`, `access_key`, `allowed_events`
+  """
+  output = srun_out('pio app list').rstrip()
+  return [ { 'name': line[2], 'id': int(line[4]),
+             'access_key': line[6], 'allowed_events': line[8] }
+          for line in [x.split() for x in output.split('\n')[1:-1]] ]
+
+def get_app_eventserver_url_json(test_context):
+  return 'http://{}:{}/events.json'.format(
+      test_context.es_ip, test_context.es_port)
+
+def get_engine_url_json(engine_ip, engine_port):
+  return 'http://{}:{}/queries.json'.format(
+      engine_ip, engine_port)
+
+def send_event(event, test_context, access_key, channel=None):
+  """ Sends an event to the eventserver
+  Args:
+    event: json-like dictionary describing an event
+    test_context (obj: `TestContext`):
+    access_key: application's access key
+    channel (str): custom channel for storing event
+  Returns: `requests.Response`
+  """
+  url = get_app_eventserver_url_json(test_context)
+  params = { 'accessKey': access_key }
+  if channel: params['channel'] = channel
+  return requests.post(
+      url,
+      params=params,
+      json=event)
+
+def send_events_batch(events, test_context, access_key, channel=None):
+  """ Send events in batch via REST to the eventserver
+  Args:
+    events: a list of json-like dictionaries for events
+    test_context (obj: `TestContext`):
+    access_key: application's access key
+    channel (str): custom channel for storing event
+  Returns: `requests.Response`
+  Requires: Events length must not exceed length of 50
+    http://docs.prediction.io/datacollection/eventmodel/#3.-batch-events-to-the-eventserver
+  """
+  url = 'http://{}:{}/batch/events.json'.format(
+      test_context.es_ip, test_context.es_port)
+  params = { 'accessKey': access_key }
+  if channel: params['channel'] = channel
+  return requests.post(
+      url,
+      params=params,
+      json=events)
+
+
+def import_events_batch(events, test_context, appid, channel=None):
+  """ Imports events in batch from file with `pio import`
+  Args:
+    events: a list of json-like dictionaries for events
+    test_context (obj: `TestContext`)
+    appid (int): application's id
+    channel (str): custom channel for storing event
+  """
+  # Writing events list to temporary file.
+  # `pio import` requires each line of input file to be a JSON string
+  # representing an event. Empty lines are not allowed.
+  contents = ''
+  for ev in events:
+      contents += '{}\n'.format(json.dumps(ev))
+  contents.rstrip('\n')
+
+  file_path = pjoin(test_context.data_directory, 'events.json.tmp')
+  try:
+      with open(file_path, 'w') as f:
+          f.write(contents)
+      srun('pio import --appid {} --input {} {}'.format(
+          appid,
+          file_path,
+          '--channel {}'.format(channel) if channel else ''))
+  finally:
+      os.remove(file_path)
+
+def get_events(test_context, access_key, params={}):
+  """ Gets events for some application
+  Args:
+    test_context (obj: `TestContext`)
+    access_key (str):
+    params (dict): special parameters for eventserver's GET, e.g:
+        'limit', 'reversed', 'event'. See the docs
+  Returns: `requests.Response`
+  """
+  url = get_app_eventserver_url_json(test_context)
+  return requests.get(url, params=dict({'accessKey': access_key}, **params))
+
+def query_engine(data, engine_ip='localhost', engine_port=8000):
+  """ Send a query to deployed engine
+  Args:
+    data (dict): json-like dictionary being an input to an engine
+    access_key (str):
+    engine_ip (str): ip of deployed engine
+    engine_port (int): port of deployed engine
+  Returns: `requests.Response`
+  """
+  url = get_engine_url_json(engine_ip, engine_port)
+  return requests.post(url, json=data)
+
+class AppEngine:
+  """ This is a utility class simplifying all app related interactions.
+  Basically it is just a wrapper on other utility functions and shell
+  scripts.
+  """
+
+  def __init__(self, test_context, app_context, already_created=False):
+    """ Args:
+        test_context (obj: `TestContext`)
+        app_context (obj: `AppContext`)
+        already_created (bool): True if the given app has been already added
+    """
+    self.test_context = test_context
+    self.app_context = app_context
+    self.engine_path = obtain_template(
+        self.test_context.engine_directory, app_context.template)
+    self.deployed_process = None
+    if already_created:
+      self.__init_info()
+    else:
+      self.id = None
+      self.access_key = None
+      self.description = None
+
+    if self.app_context.engine_json_path:
+      self.__copy_engine_json()
+
+  def __copy_engine_json(self):
+    to_path = pjoin(self.engine_path, 'engine.json')
+    copyfile(self.app_context.engine_json_path, to_path)
+
+  def __init_info(self):
+    info = self.show()
+    self.id = info['id']
+    self.access_key = info['access_key']
+    self.description = info['description']
+
+  def new(self, id=None, description=None, access_key=None):
+    """ Creates a new application with given parameters """
+    srun('pio app new {} {} {} {}'.format(
+        '--id {}'.format(id) if id else '',
+        '--description \"{}\"'.format(description) if description else '',
+        '--access-key {}'.format(access_key) if access_key else '',
+        self.app_context.name))
+
+    self.__init_info()
+
+
+  def show(self):
+    """ Returns: application info in dictionary with the keys:
+         `name`: str, `id`: int, `description`: str,
+         `access_key`: str, `allowed_events`: str
+    """
+    output = srun_out('pio app show {}'.format(self.app_context.name)).rstrip()
+    lines = [x.split() for x in output.split('\n')]
+    return { 'name': lines[0][3],
+             'id': int(lines[1][4]),
+             'description': lines[2][3] if len(lines[2]) >= 4 else '',
+             'access_key': lines[3][4],
+             'allowed_events': lines[3][5] }
+
+
+  # deletes this app from pio
+  def delete(self):
+    srun('pio app delete {0} --force'.format(self.app_context.name))
+
+  def build(self, sbt_extra=None, clean=False, no_asm=True):
+    srun('cd {0}; pio build {1} {2} {3}'.format(
+        self.engine_path,
+        '--sbt-extra {}'.format(sbt_extra) if sbt_extra else '',
+        '--clean' if clean else '',
+        '--no-asm' if no_asm else ''))
+
+  def train(self, batch=None, skip_sanity_check=False, stop_after_read=False,
+          stop_after_prepare=False, engine_factory=None,
+          engine_params_key=None, scratch_uri=None):
+
+    srun('cd {}; pio train {} {} {} {} {} {} {}'.format(
+        self.engine_path,
+        '--batch {}'.format(batch) if batch else '',
+        '--skip-sanity-check' if skip_sanity_check else '',
+        '--stop-after-read' if stop_after_read else '',
+        '--stop-after-prepare' if stop_after_prepare else '',
+        '--engine_factory {}'.format(engine_factory) if engine_factory else '',
+        '--engine-params-key {}'.format(engine_params_key) if engine_params_key else '',
+        '--scratch-uri {}'.format(scratch_uri) if scratch_uri else ''))
+
+  def deploy(self, wait_time=0, ip=None, port=None, engine_instance_id=None,
+          feedback=False, accesskey=None, event_server_ip=None, event_server_port=None,
+          batch=None, scratch_uri=None):
+
+    command = 'cd {}; pio deploy {} {} {} {} {} {} {} {} {}'.format(
+            self.engine_path,
+            '--ip {}'.format(ip) if ip else '',
+            '--port {}'.format(port) if port else '',
+            '--engine-instance-id {}'.format(engine_instance_id) if engine_instance_id else '',
+            '--feedback' if feedback else '',
+            '--accesskey {}'.format(accesskey) if accesskey else '',
+            '--event-server-ip {}'.format(event_server_ip) if event_server_ip else '',
+            '--event-server-port {}'.format(event_server_port) if event_server_port else '',
+            '--batch {}'.format(bach) if batch else '',
+            '--scratch-uri {}'.format(scratch_uri) if scratch_uri else '')
+
+    self.deployed_process = srun_bg(command)
+    time.sleep(wait_time)
+    if self.deployed_process.poll() is not None:
+      raise Exception('Application engine terminated')
+    self.ip = ip if ip else 'localhost'
+    self.port = port if port else 8000
+
+  def stop(self):
+    """ Kills deployed engine """
+    if self.deployed_process:
+      self.deployed_process.kill()
+
+  def new_channel(self, channel):
+    srun('pio app channel-new {0}'.format(channel))
+
+  def delete_channel(self, channel):
+    srun('pio app channel-delete {0} --force'.format(channel))
+
+  def send_event(self, event):
+    return send_event(event, self.test_context, self.access_key)
+
+  def send_events_batch(self, events):
+    return send_events_batch(events, self.test_context, self.access_key)
+
+  def import_events_batch(self, events):
+    return import_events_batch(events, self.test_context, self.id)
+
+  def get_events(self, params={}):
+    return get_events(self.test_context, self.access_key, params)
+
+  def delete_data(self, delete_all=True, channel=None):
+    srun('pio app data-delete {0} {1} {2} --force'
+        .format(
+            self.app_context.name,
+            '--all' if delete_all else '',
+            '--channel ' + channel if channel is not None else ''))
+
+  def query(self, data):
+    return query_engine(data, self.ip, self.port)

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/5320724a/tests/run_docker.sh
----------------------------------------------------------------------
diff --git a/tests/run_docker.sh b/tests/run_docker.sh
new file mode 100755
index 0000000..d5925ef
--- /dev/null
+++ b/tests/run_docker.sh
@@ -0,0 +1,39 @@
+#!/bin/bash -
+
+USAGE=$"Usage: run_docer <meta> <event> <model> <pio> <command>
+  Where:
+    meta         = [PGSQL,ELASTICSEARCH]
+    event        = [PGSQL,HBASE]
+    model        = [PGSQL,LOCALFS,HDFS]
+    pio          = path to PredictionIO directory
+    command      = command to run in the container"
+
+if ! [[ "$1" =~ ^(PGSQL|ELASTICSEARCH)$ ]]; then
+  echo "$USAGE"
+  exit 1
+fi
+
+if ! [[ "$2" =~ ^(PGSQL|HBASE)$ ]]; then
+  echo "$USAGE"
+  exit 1
+fi
+
+if ! [[ "$3" =~ ^(PGSQL|LOCALFS|HDFS)$ ]]; then
+  echo "$USAGE"
+  exit 1
+fi
+
+if [ ! -d "$4" ]; then
+  echo "Directory $4 does not exist"
+  echo "$USAGE"
+  exit 1
+fi
+
+docker run -it -h localhost \
+  -v $4:/pio_host \
+  -v ~/.ivy2:/root/.ivy2 \
+  -e PIO_STORAGE_REPOSITORIES_METADATA_SOURCE=$1 \
+  -e PIO_STORAGE_REPOSITORIES_EVENTDATA_SOURCE=$2 \
+  -e PIO_STORAGE_REPOSITORIES_MODELDATA_SOURCE=$3 \
+  -p 8000:8000 -p 7070:7070 -p 8080:8080 -p 8081:8081 -p 4040:4040 \
+  -p 60000:60000 -p 60010:60010 -p 60020:60020 -p 60030:60030 ziemin/pio-testing $5

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/5320724a/tests/script.travis.sh
----------------------------------------------------------------------
diff --git a/tests/script.travis.sh b/tests/script.travis.sh
new file mode 100755
index 0000000..8596078
--- /dev/null
+++ b/tests/script.travis.sh
@@ -0,0 +1,21 @@
+#!/bin/bash -
+
+set -e
+
+if [[ $BUILD_TYPE == Unit ]]; then
+  # Prepare pio environment variables
+  set -a
+  source conf/pio-env.sh.travis
+  set +a
+
+  # Run stylecheck
+  sbt scalastyle
+  # Run all unit tests
+  sbt test
+
+else
+  REPO=`pwd`
+
+  ./tests/run_docker.sh $METADATA_REP $EVENTDATA_REP $MODELDATA_REP \
+    $REPO 'python3 /tests/pio_tests/tests.py'
+fi