You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2016/08/09 21:43:56 UTC
[48/52] [abbrv] incubator-predictionio git commit: Renamed directory
testing to tests
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/5320724a/tests/pio_tests/engines/recommendation-engine/src/main/scala/ALSAlgorithm.scala
----------------------------------------------------------------------
diff --git a/tests/pio_tests/engines/recommendation-engine/src/main/scala/ALSAlgorithm.scala b/tests/pio_tests/engines/recommendation-engine/src/main/scala/ALSAlgorithm.scala
new file mode 100644
index 0000000..17c2806
--- /dev/null
+++ b/tests/pio_tests/engines/recommendation-engine/src/main/scala/ALSAlgorithm.scala
@@ -0,0 +1,138 @@
+package org.template.recommendation
+
+import org.apache.predictionio.controller.PAlgorithm
+import org.apache.predictionio.controller.Params
+import org.apache.predictionio.data.storage.BiMap
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.recommendation.ALS
+import org.apache.spark.mllib.recommendation.{Rating => MLlibRating}
+import org.apache.spark.mllib.recommendation.ALSModel
+
+import grizzled.slf4j.Logger
+
+case class ALSAlgorithmParams(
+ rank: Int,
+ numIterations: Int,
+ lambda: Double,
+ seed: Option[Long]) extends Params
+
+class ALSAlgorithm(val ap: ALSAlgorithmParams)
+ extends PAlgorithm[PreparedData, ALSModel, Query, PredictedResult] {
+
+ @transient lazy val logger = Logger[this.type]
+
+ if (ap.numIterations > 30) {
+ logger.warn(
+ s"ALSAlgorithmParams.numIterations > 30, current: ${ap.numIterations}. " +
+ s"There is a chance of running to StackOverflowException. Lower this number to remedy it")
+ }
+
+ def train(sc: SparkContext, data: PreparedData): ALSModel = {
+ // MLLib ALS cannot handle empty training data.
+ require(!data.ratings.take(1).isEmpty,
+ s"RDD[Rating] in PreparedData cannot be empty." +
+ " Please check if DataSource generates TrainingData" +
+ " and Preprator generates PreparedData correctly.")
+ // Convert user and item String IDs to Int index for MLlib
+
+ val userStringIntMap = BiMap.stringInt(data.ratings.map(_.user))
+ val itemStringIntMap = BiMap.stringInt(data.ratings.map(_.item))
+ val mllibRatings = data.ratings.map( r =>
+ // MLlibRating requires integer index for user and item
+ MLlibRating(userStringIntMap(r.user), itemStringIntMap(r.item), r.rating)
+ )
+
+ // seed for MLlib ALS
+ val seed = ap.seed.getOrElse(System.nanoTime)
+
+ // If you only have one type of implicit event (Eg. "view" event only),
+ // replace ALS.train(...) with
+ //val m = ALS.trainImplicit(
+ //ratings = mllibRatings,
+ //rank = ap.rank,
+ //iterations = ap.numIterations,
+ //lambda = ap.lambda,
+ //blocks = -1,
+ //alpha = 1.0,
+ //seed = seed)
+
+ val m = ALS.train(
+ ratings = mllibRatings,
+ rank = ap.rank,
+ iterations = ap.numIterations,
+ lambda = ap.lambda,
+ blocks = -1,
+ seed = seed)
+
+ new ALSModel(
+ rank = m.rank,
+ userFeatures = m.userFeatures,
+ productFeatures = m.productFeatures,
+ userStringIntMap = userStringIntMap,
+ itemStringIntMap = itemStringIntMap)
+ }
+
+ def predict(model: ALSModel, query: Query): PredictedResult = {
+ // Convert String ID to Int index for Mllib
+ model.userStringIntMap.get(query.user).map { userInt =>
+ // create inverse view of itemStringIntMap
+ val itemIntStringMap = model.itemStringIntMap.inverse
+ // recommendProducts() returns Array[MLlibRating], which uses item Int
+ // index. Convert it to String ID for returning PredictedResult
+ val itemScores = model.recommendProducts(userInt, query.num)
+ .map (r => ItemScore(itemIntStringMap(r.product), r.rating))
+ new PredictedResult(itemScores)
+ }.getOrElse{
+ logger.info(s"No prediction for unknown user ${query.user}.")
+ new PredictedResult(Array.empty)
+ }
+ }
+
+ // This function is used by the evaluation module, where a batch of queries is sent to this engine
+ // for evaluation purpose.
+ override def batchPredict(model: ALSModel, queries: RDD[(Long, Query)]): RDD[(Long, PredictedResult)] = {
+ val userIxQueries: RDD[(Int, (Long, Query))] = queries
+ .map { case (ix, query) => {
+ // If user not found, then the index is -1
+ val userIx = model.userStringIntMap.get(query.user).getOrElse(-1)
+ (userIx, (ix, query))
+ }}
+
+ // Cross product of all valid users from the queries and products in the model.
+ val usersProducts: RDD[(Int, Int)] = userIxQueries
+ .keys
+ .filter(_ != -1)
+ .cartesian(model.productFeatures.map(_._1))
+
+ // Call mllib ALS's predict function.
+ val ratings: RDD[MLlibRating] = model.predict(usersProducts)
+
+ // The following code construct predicted results from mllib's ratings.
+ // Not optimal implementation. Instead of groupBy, should use combineByKey with a PriorityQueue
+ val userRatings: RDD[(Int, Iterable[MLlibRating])] = ratings.groupBy(_.user)
+
+ userIxQueries.leftOuterJoin(userRatings)
+ .map {
+ // When there are ratings
+ case (userIx, ((ix, query), Some(ratings))) => {
+ val topItemScores: Array[ItemScore] = ratings
+ .toArray
+ .sortBy(_.rating)(Ordering.Double.reverse) // note: from large to small ordering
+ .take(query.num)
+ .map { rating => ItemScore(
+ model.itemStringIntMap.inverse(rating.product),
+ rating.rating) }
+
+ (ix, PredictedResult(itemScores = topItemScores))
+ }
+ // When user doesn't exist in training data
+ case (userIx, ((ix, query), None)) => {
+ require(userIx == -1)
+ (ix, PredictedResult(itemScores = Array.empty))
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/5320724a/tests/pio_tests/engines/recommendation-engine/src/main/scala/ALSModel.scala
----------------------------------------------------------------------
diff --git a/tests/pio_tests/engines/recommendation-engine/src/main/scala/ALSModel.scala b/tests/pio_tests/engines/recommendation-engine/src/main/scala/ALSModel.scala
new file mode 100644
index 0000000..243c1d1
--- /dev/null
+++ b/tests/pio_tests/engines/recommendation-engine/src/main/scala/ALSModel.scala
@@ -0,0 +1,63 @@
+package org.apache.spark.mllib.recommendation
+// This must be the same package as Spark's MatrixFactorizationModel because
+// MatrixFactorizationModel's constructor is private and we are using
+// its constructor in order to save and load the model
+
+import org.template.recommendation.ALSAlgorithmParams
+
+import org.apache.predictionio.controller.IPersistentModel
+import org.apache.predictionio.controller.IPersistentModelLoader
+import org.apache.predictionio.data.storage.BiMap
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+class ALSModel(
+ override val rank: Int,
+ override val userFeatures: RDD[(Int, Array[Double])],
+ override val productFeatures: RDD[(Int, Array[Double])],
+ val userStringIntMap: BiMap[String, Int],
+ val itemStringIntMap: BiMap[String, Int])
+ extends MatrixFactorizationModel(rank, userFeatures, productFeatures)
+ with IPersistentModel[ALSAlgorithmParams] {
+
+ def save(id: String, params: ALSAlgorithmParams,
+ sc: SparkContext): Boolean = {
+
+ sc.parallelize(Seq(rank)).saveAsObjectFile(s"/tmp/${id}/rank")
+ userFeatures.saveAsObjectFile(s"/tmp/${id}/userFeatures")
+ productFeatures.saveAsObjectFile(s"/tmp/${id}/productFeatures")
+ sc.parallelize(Seq(userStringIntMap))
+ .saveAsObjectFile(s"/tmp/${id}/userStringIntMap")
+ sc.parallelize(Seq(itemStringIntMap))
+ .saveAsObjectFile(s"/tmp/${id}/itemStringIntMap")
+ true
+ }
+
+ override def toString = {
+ s"userFeatures: [${userFeatures.count()}]" +
+ s"(${userFeatures.take(2).toList}...)" +
+ s" productFeatures: [${productFeatures.count()}]" +
+ s"(${productFeatures.take(2).toList}...)" +
+ s" userStringIntMap: [${userStringIntMap.size}]" +
+ s"(${userStringIntMap.take(2)}...)" +
+ s" itemStringIntMap: [${itemStringIntMap.size}]" +
+ s"(${itemStringIntMap.take(2)}...)"
+ }
+}
+
+object ALSModel
+ extends IPersistentModelLoader[ALSAlgorithmParams, ALSModel] {
+ def apply(id: String, params: ALSAlgorithmParams,
+ sc: Option[SparkContext]) = {
+ new ALSModel(
+ rank = sc.get.objectFile[Int](s"/tmp/${id}/rank").first,
+ userFeatures = sc.get.objectFile(s"/tmp/${id}/userFeatures"),
+ productFeatures = sc.get.objectFile(s"/tmp/${id}/productFeatures"),
+ userStringIntMap = sc.get
+ .objectFile[BiMap[String, Int]](s"/tmp/${id}/userStringIntMap").first,
+ itemStringIntMap = sc.get
+ .objectFile[BiMap[String, Int]](s"/tmp/${id}/itemStringIntMap").first)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/5320724a/tests/pio_tests/engines/recommendation-engine/src/main/scala/DataSource.scala
----------------------------------------------------------------------
diff --git a/tests/pio_tests/engines/recommendation-engine/src/main/scala/DataSource.scala b/tests/pio_tests/engines/recommendation-engine/src/main/scala/DataSource.scala
new file mode 100644
index 0000000..eea3ae6
--- /dev/null
+++ b/tests/pio_tests/engines/recommendation-engine/src/main/scala/DataSource.scala
@@ -0,0 +1,103 @@
+package org.template.recommendation
+
+import org.apache.predictionio.controller.PDataSource
+import org.apache.predictionio.controller.EmptyEvaluationInfo
+import org.apache.predictionio.controller.EmptyActualResult
+import org.apache.predictionio.controller.Params
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.store.PEventStore
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+import grizzled.slf4j.Logger
+
+case class DataSourceEvalParams(kFold: Int, queryNum: Int)
+
+case class DataSourceParams(
+ appName: String,
+ evalParams: Option[DataSourceEvalParams]) extends Params
+
+class DataSource(val dsp: DataSourceParams)
+ extends PDataSource[TrainingData,
+ EmptyEvaluationInfo, Query, ActualResult] {
+
+ @transient lazy val logger = Logger[this.type]
+
+ def getRatings(sc: SparkContext): RDD[Rating] = {
+
+ val eventsRDD: RDD[Event] = PEventStore.find(
+ appName = dsp.appName,
+ entityType = Some("user"),
+ eventNames = Some(List("rate", "buy")), // read "rate" and "buy" event
+ // targetEntityType is optional field of an event.
+ targetEntityType = Some(Some("item")))(sc)
+
+ val ratingsRDD: RDD[Rating] = eventsRDD.map { event =>
+ val rating = try {
+ val ratingValue: Double = event.event match {
+ case "rate" => event.properties.get[Double]("rating")
+ case "buy" => 4.0 // map buy event to rating value of 4
+ case _ => throw new Exception(s"Unexpected event ${event} is read.")
+ }
+ // entityId and targetEntityId is String
+ Rating(event.entityId,
+ event.targetEntityId.get,
+ ratingValue)
+ } catch {
+ case e: Exception => {
+ logger.error(s"Cannot convert ${event} to Rating. Exception: ${e}.")
+ throw e
+ }
+ }
+ rating
+ }.cache()
+
+ ratingsRDD
+ }
+
+ override
+ def readTraining(sc: SparkContext): TrainingData = {
+ new TrainingData(getRatings(sc))
+ }
+
+ override
+ def readEval(sc: SparkContext)
+ : Seq[(TrainingData, EmptyEvaluationInfo, RDD[(Query, ActualResult)])] = {
+ require(!dsp.evalParams.isEmpty, "Must specify evalParams")
+ val evalParams = dsp.evalParams.get
+
+ val kFold = evalParams.kFold
+ val ratings: RDD[(Rating, Long)] = getRatings(sc).zipWithUniqueId
+ ratings.cache
+
+ (0 until kFold).map { idx => {
+ val trainingRatings = ratings.filter(_._2 % kFold != idx).map(_._1)
+ val testingRatings = ratings.filter(_._2 % kFold == idx).map(_._1)
+
+ val testingUsers: RDD[(String, Iterable[Rating])] = testingRatings.groupBy(_.user)
+
+ (new TrainingData(trainingRatings),
+ new EmptyEvaluationInfo(),
+ testingUsers.map {
+ case (user, ratings) => (Query(user, evalParams.queryNum), ActualResult(ratings.toArray))
+ }
+ )
+ }}
+ }
+}
+
+case class Rating(
+ user: String,
+ item: String,
+ rating: Double
+)
+
+class TrainingData(
+ val ratings: RDD[Rating]
+) extends Serializable {
+ override def toString = {
+ s"ratings: [${ratings.count()}] (${ratings.take(2).toList}...)"
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/5320724a/tests/pio_tests/engines/recommendation-engine/src/main/scala/Engine.scala
----------------------------------------------------------------------
diff --git a/tests/pio_tests/engines/recommendation-engine/src/main/scala/Engine.scala b/tests/pio_tests/engines/recommendation-engine/src/main/scala/Engine.scala
new file mode 100644
index 0000000..79840dc
--- /dev/null
+++ b/tests/pio_tests/engines/recommendation-engine/src/main/scala/Engine.scala
@@ -0,0 +1,32 @@
+package org.template.recommendation
+
+import org.apache.predictionio.controller.IEngineFactory
+import org.apache.predictionio.controller.Engine
+
+case class Query(
+ user: String,
+ num: Int
+) extends Serializable
+
+case class PredictedResult(
+ itemScores: Array[ItemScore]
+) extends Serializable
+
+case class ActualResult(
+ ratings: Array[Rating]
+) extends Serializable
+
+case class ItemScore(
+ item: String,
+ score: Double
+) extends Serializable
+
+object RecommendationEngine extends IEngineFactory {
+ def apply() = {
+ new Engine(
+ classOf[DataSource],
+ classOf[Preparator],
+ Map("als" -> classOf[ALSAlgorithm]),
+ classOf[Serving])
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/5320724a/tests/pio_tests/engines/recommendation-engine/src/main/scala/Evaluation.scala
----------------------------------------------------------------------
diff --git a/tests/pio_tests/engines/recommendation-engine/src/main/scala/Evaluation.scala b/tests/pio_tests/engines/recommendation-engine/src/main/scala/Evaluation.scala
new file mode 100644
index 0000000..34e5689
--- /dev/null
+++ b/tests/pio_tests/engines/recommendation-engine/src/main/scala/Evaluation.scala
@@ -0,0 +1,89 @@
+package org.template.recommendation
+
+import org.apache.predictionio.controller.Evaluation
+import org.apache.predictionio.controller.OptionAverageMetric
+import org.apache.predictionio.controller.AverageMetric
+import org.apache.predictionio.controller.EmptyEvaluationInfo
+import org.apache.predictionio.controller.EngineParamsGenerator
+import org.apache.predictionio.controller.EngineParams
+import org.apache.predictionio.controller.MetricEvaluator
+
+// Usage:
+// $ pio eval org.template.recommendation.RecommendationEvaluation \
+// org.template.recommendation.EngineParamsList
+
+case class PrecisionAtK(k: Int, ratingThreshold: Double = 2.0)
+ extends OptionAverageMetric[EmptyEvaluationInfo, Query, PredictedResult, ActualResult] {
+ require(k > 0, "k must be greater than 0")
+
+ override def header = s"Precision@K (k=$k, threshold=$ratingThreshold)"
+
+ def calculate(q: Query, p: PredictedResult, a: ActualResult): Option[Double] = {
+ val positives: Set[String] = a.ratings.filter(_.rating >= ratingThreshold).map(_.item).toSet
+
+ // If there is no positive results, Precision is undefined. We don't consider this case in the
+ // metrics, hence we return None.
+ if (positives.size == 0) {
+ return None
+ }
+
+ val tpCount: Int = p.itemScores.take(k).filter(is => positives(is.item)).size
+
+ Some(tpCount.toDouble / math.min(k, positives.size))
+ }
+}
+
+case class PositiveCount(ratingThreshold: Double = 2.0)
+ extends AverageMetric[EmptyEvaluationInfo, Query, PredictedResult, ActualResult] {
+ override def header = s"PositiveCount (threshold=$ratingThreshold)"
+
+ def calculate(q: Query, p: PredictedResult, a: ActualResult): Double = {
+ a.ratings.filter(_.rating >= ratingThreshold).size
+ }
+}
+
+object RecommendationEvaluation extends Evaluation {
+ engineEvaluator = (
+ RecommendationEngine(),
+ MetricEvaluator(
+ metric = PrecisionAtK(k = 10, ratingThreshold = 4.0),
+ otherMetrics = Seq(
+ PositiveCount(ratingThreshold = 4.0),
+ PrecisionAtK(k = 10, ratingThreshold = 2.0),
+ PositiveCount(ratingThreshold = 2.0),
+ PrecisionAtK(k = 10, ratingThreshold = 1.0),
+ PositiveCount(ratingThreshold = 1.0)
+ )))
+}
+
+
+object ComprehensiveRecommendationEvaluation extends Evaluation {
+ val ratingThresholds = Seq(0.0, 2.0, 4.0)
+ val ks = Seq(1, 3, 10)
+
+ engineEvaluator = (
+ RecommendationEngine(),
+ MetricEvaluator(
+ metric = PrecisionAtK(k = 3, ratingThreshold = 2.0),
+ otherMetrics = (
+ (for (r <- ratingThresholds) yield PositiveCount(ratingThreshold = r)) ++
+ (for (r <- ratingThresholds; k <- ks) yield PrecisionAtK(k = k, ratingThreshold = r))
+ )))
+}
+
+
+trait BaseEngineParamsList extends EngineParamsGenerator {
+ protected val baseEP = EngineParams(
+ dataSourceParams = DataSourceParams(
+ appName = "INVALID_APP_NAME",
+ evalParams = Some(DataSourceEvalParams(kFold = 5, queryNum = 10))))
+}
+
+object EngineParamsList extends BaseEngineParamsList {
+ engineParamsList = for(
+ rank <- Seq(5, 10, 20);
+ numIterations <- Seq(1, 5, 10))
+ yield baseEP.copy(
+ algorithmParamsList = Seq(
+ ("als", ALSAlgorithmParams(rank, numIterations, 0.01, Some(3)))))
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/5320724a/tests/pio_tests/engines/recommendation-engine/src/main/scala/Preparator.scala
----------------------------------------------------------------------
diff --git a/tests/pio_tests/engines/recommendation-engine/src/main/scala/Preparator.scala b/tests/pio_tests/engines/recommendation-engine/src/main/scala/Preparator.scala
new file mode 100644
index 0000000..8f2f7e4
--- /dev/null
+++ b/tests/pio_tests/engines/recommendation-engine/src/main/scala/Preparator.scala
@@ -0,0 +1,19 @@
+package org.template.recommendation
+
+import org.apache.predictionio.controller.PPreparator
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+class Preparator
+ extends PPreparator[TrainingData, PreparedData] {
+
+ def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = {
+ new PreparedData(ratings = trainingData.ratings)
+ }
+}
+
+class PreparedData(
+ val ratings: RDD[Rating]
+) extends Serializable
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/5320724a/tests/pio_tests/engines/recommendation-engine/src/main/scala/Serving.scala
----------------------------------------------------------------------
diff --git a/tests/pio_tests/engines/recommendation-engine/src/main/scala/Serving.scala b/tests/pio_tests/engines/recommendation-engine/src/main/scala/Serving.scala
new file mode 100644
index 0000000..38ba8b9
--- /dev/null
+++ b/tests/pio_tests/engines/recommendation-engine/src/main/scala/Serving.scala
@@ -0,0 +1,13 @@
+package org.template.recommendation
+
+import org.apache.predictionio.controller.LServing
+
+class Serving
+ extends LServing[Query, PredictedResult] {
+
+ override
+ def serve(query: Query,
+ predictedResults: Seq[PredictedResult]): PredictedResult = {
+ predictedResults.head
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/5320724a/tests/pio_tests/engines/recommendation-engine/template.json
----------------------------------------------------------------------
diff --git a/tests/pio_tests/engines/recommendation-engine/template.json b/tests/pio_tests/engines/recommendation-engine/template.json
new file mode 100644
index 0000000..fb4a50b
--- /dev/null
+++ b/tests/pio_tests/engines/recommendation-engine/template.json
@@ -0,0 +1 @@
+{"pio": {"version": { "min": "0.9.2" }}}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/5320724a/tests/pio_tests/globals.py
----------------------------------------------------------------------
diff --git a/tests/pio_tests/globals.py b/tests/pio_tests/globals.py
new file mode 100644
index 0000000..1134501
--- /dev/null
+++ b/tests/pio_tests/globals.py
@@ -0,0 +1,17 @@
+import subprocess
+
+SUPPRESS_STDOUT=False
+SUPPRESS_STDERR=False
+LOGGER_NAME='INT_TESTS'
+
+def std_out():
+ if SUPPRESS_STDOUT:
+ return subprocess.DEVNULL
+ else:
+ return None
+
+def std_err():
+ if SUPPRESS_STDERR:
+ return subprocess.DEVNULL
+ else:
+ return None
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/5320724a/tests/pio_tests/integration.py
----------------------------------------------------------------------
diff --git a/tests/pio_tests/integration.py b/tests/pio_tests/integration.py
new file mode 100644
index 0000000..441365e
--- /dev/null
+++ b/tests/pio_tests/integration.py
@@ -0,0 +1,46 @@
+import unittest
+import logging
+import pio_tests.globals as globals
+
+class TestContext:
+ """Class representing the settings provided for every test"""
+
+ def __init__(self, engine_directory, data_directory, es_ip='0.0.0.0', es_port=7070):
+ """
+ Args:
+ engine_directory (str): path to the directory where the engines are stored
+ data_directory (str): path to the directory where tests can keep their data
+ es_ip (str): ip of the eventserver
+ es_port (int): port of the eventserver
+ """
+ self.engine_directory = engine_directory
+ self.data_directory = data_directory
+ self.es_ip = es_ip
+ self.es_port = es_port
+
+class BaseTestCase(unittest.TestCase):
+ """This is the base class for all integration tests
+
+ This class sets up a `TestContext` object and a logger for every test case
+ """
+ def __init__(self, test_context, methodName='runTest'):
+ super(BaseTestCase, self).__init__(methodName)
+ self.test_context = test_context
+ self.log = logging.getLogger(globals.LOGGER_NAME)
+
+class AppContext:
+ """ This class is a description of an instance of the engine"""
+
+ def __init__(self, name, template, engine_json_path=None):
+ """
+ Args:
+ name (str): application name
+ template (str): either the name of an engine from the engines directory
+ or a link to repository with the engine
+ engine_json_path (str): path to json file describing an engine (a custom engine.json)
+ to be used for the application. If `None`, engine.json from the engine's directory
+ will be used
+ """
+ self.name = name
+ self.template = template
+ self.engine_json_path = engine_json_path
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/5320724a/tests/pio_tests/scenarios/__init__.py
----------------------------------------------------------------------
diff --git a/tests/pio_tests/scenarios/__init__.py b/tests/pio_tests/scenarios/__init__.py
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/5320724a/tests/pio_tests/scenarios/basic_app_usecases.py
----------------------------------------------------------------------
diff --git a/tests/pio_tests/scenarios/basic_app_usecases.py b/tests/pio_tests/scenarios/basic_app_usecases.py
new file mode 100644
index 0000000..d8b3a1e
--- /dev/null
+++ b/tests/pio_tests/scenarios/basic_app_usecases.py
@@ -0,0 +1,154 @@
+import os
+import unittest
+import random
+import logging
+import time
+from subprocess import CalledProcessError
+from pio_tests.integration import BaseTestCase, AppContext
+from utils import *
+
+ITEMS_COUNT = 12
+
+def get_buy_events(users, per_user=2):
+ events = []
+ for u in range(users):
+ items = set([random.randint(0, ITEMS_COUNT) for i in range(per_user)])
+ for item in items:
+ events.append({
+ "event": "buy",
+ "entityType": "user",
+ "entityId": u,
+ "targetEntityType": "item",
+ "targetEntityId": item })
+
+ return events
+
+def get_rate_events(users, per_user=2):
+ events = []
+ for u in range(users):
+ items = set([random.randint(0, ITEMS_COUNT) for i in range(per_user)])
+ for item in items:
+ events.append( {
+ "event": "rate",
+ "entityType": "user",
+ "entityId": u,
+ "targetEntityType": "item",
+ "targetEntityId": item,
+ "properties": { "rating" : float(random.randint(1,5)) } })
+
+ return events
+
+
+class BasicAppUsecases(BaseTestCase):
+
+ def setUp(self):
+ random.seed(3)
+ self.log.info("Setting up the engine")
+
+ template_path = pjoin(
+ self.test_context.engine_directory, "recommendation-engine")
+ engine_json_path = pjoin(
+ self.test_context.data_directory, "quickstart_test/engine.json")
+
+ app_context = AppContext(
+ name="MyRecommender",
+ template=template_path,
+ engine_json_path=engine_json_path)
+
+ self.app = AppEngine(self.test_context, app_context)
+
+ def runTest(self):
+ self.app_creation()
+ self.check_app_list()
+ self.check_data()
+ self.check_build()
+ self.check_train_and_deploy()
+
+ def app_creation(self):
+ self.log.info("Adding a new application")
+ description = "SomeDescription"
+ self.app.new(description=description)
+ self.assertEqual(description, self.app.description)
+
+ self.log.info("Creating an app again - should fail")
+ self.assertRaises(CalledProcessError, lambda : self.app.new())
+
+ def check_app_list(self):
+ self.log.info("Checking if app is on the list")
+ apps = pio_app_list()
+ self.assertEqual(1,
+ len([a for a in apps if a['name'] == self.app.app_context.name]))
+
+ def check_data(self):
+ self.log.info("Importing events")
+ buy_events = get_buy_events(20, 1)
+ rate_events = get_rate_events(20, 1)
+
+ for ev in buy_events + rate_events:
+ self.assertEquals(201, self.app.send_event(ev).status_code)
+
+ self.log.info("Checking imported events")
+ r = self.app.get_events(params={'limit': -1})
+ self.assertEqual(200, r.status_code)
+ self.assertEqual(len(buy_events) + len(rate_events), len(r.json()))
+
+ self.log.info("Deleting entire data")
+ self.app.delete_data()
+ self.log.info("Checking if there are no events at all")
+ r = self.app.get_events(params={'limit': -1})
+ self.assertEqual(404, r.status_code)
+
+ def check_build(self):
+ self.log.info("Clean build")
+ self.app.build(clean=True)
+ self.log.info("Second build")
+ self.app.build()
+
+ def check_train_and_deploy(self):
+ self.log.info("import some data first")
+ buy_events = get_buy_events(20, 5)
+ rate_events = get_rate_events(20, 5)
+ for ev in buy_events + rate_events:
+ self.assertEquals(201, self.app.send_event(ev).status_code)
+
+ self.log.info("Training")
+ self.app.train()
+ self.log.info("Deploying")
+ self.app.deploy()
+ self.assertFalse(self.app.deployed_process.poll())
+
+ self.log.info("Importing more events")
+ buy_events = get_buy_events(60, 5)
+ rate_events = get_rate_events(60, 5)
+ for ev in buy_events + rate_events:
+ self.assertEquals(201, self.app.send_event(ev).status_code)
+
+ self.log.info("Training again")
+ self.app.train()
+
+ time.sleep(7)
+
+ self.log.info("Check serving")
+ r = self.app.query({"user": 1, "num": 5})
+ self.assertEqual(200, r.status_code)
+ result = r.json()
+ self.assertEqual(5, len(result['itemScores']))
+ r = self.app.query({"user": 5, "num": 3})
+ self.assertEqual(200, r.status_code)
+ result = r.json()
+ self.assertEqual(3, len(result['itemScores']))
+
+ self.log.info("Remove data")
+ self.app.delete_data()
+ self.log.info("Retraining should fail")
+ self.assertRaises(CalledProcessError, lambda: self.app.train())
+
+
+ def tearDown(self):
+ self.log.info("Stopping deployed engine")
+ self.app.stop()
+ self.log.info("Deleting all related data")
+ self.app.delete_data()
+ self.log.info("Removing an app")
+ self.app.delete()
+
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/5320724a/tests/pio_tests/scenarios/eventserver_test.py
----------------------------------------------------------------------
diff --git a/tests/pio_tests/scenarios/eventserver_test.py b/tests/pio_tests/scenarios/eventserver_test.py
new file mode 100644
index 0000000..8c243d2
--- /dev/null
+++ b/tests/pio_tests/scenarios/eventserver_test.py
@@ -0,0 +1,155 @@
+import unittest
+import requests
+import json
+import argparse
+from subprocess import Popen
+from utils import AppEngine, pjoin
+from pio_tests.integration import BaseTestCase, AppContext
+
+class EventserverTest(BaseTestCase):
+ """ Integration test for PredictionIO Eventserver API
+ Refer to below for further information:
+ http://docs.prediction.io/datacollection/eventmodel/
+ http://docs.prediction.io/datacollection/eventapi/
+ """
+ # Helper methods
+ def eventserver_url(self, path=None):
+ url = 'http://{}:{}'.format(
+ self.test_context.es_ip, self.test_context.es_port)
+ if path: url += '/{}'.format(path)
+ return url
+
+ def load_events(self, json_file):
+ file_path = pjoin(self.test_context.data_directory,
+ 'eventserver_test/{}'.format(json_file))
+ return json.loads(open(file_path).read())
+
+
+ def setUp(self):
+ template_path = pjoin(
+ self.test_context.engine_directory, "recommendation-engine")
+ app_context = AppContext(
+ name="MyRecommender",
+ template=template_path)
+ self.app = AppEngine(self.test_context, app_context)
+
+ def runTest(self):
+ self.log.info("Check if Eventserver is alive and running")
+ r = requests.get(self.eventserver_url())
+ self.assertDictEqual(r.json(), {"status": "alive"})
+
+ self.log.info("Cannot view events with empty accessKey")
+ r = requests.get(self.eventserver_url(path='events.json'))
+ self.assertDictEqual(r.json(), {"message": "Missing accessKey."})
+
+ self.log.info("Cannot view events with invalid accessKey")
+ r = requests.get(self.eventserver_url(path='events.json'),
+ params={'accessKey': ''})
+ self.assertDictEqual(r.json(), {"message": "Invalid accessKey."})
+
+ self.log.info("Adding new pio application")
+ self.app.new()
+
+ self.log.info("No events have been sent yet")
+ r = self.app.get_events()
+ self.assertDictEqual(r.json(), {"message": "Not Found"})
+
+ # Testing POST
+ self.log.info("Sending single event")
+ event1 = {
+ 'event' : 'test',
+ 'entityType' : 'test',
+ 'entityId' : 't1'
+ }
+ r = self.app.send_event(event1)
+ self.assertEqual(201, r.status_code)
+
+ self.log.info("Sending batch of events")
+ r = self.app.send_events_batch(
+ self.load_events("rate_events_25.json"))
+ self.assertEqual(200, r.status_code)
+
+ self.log.info("Cannot send more than 50 events per batch")
+ r = self.app.send_events_batch(
+ self.load_events("signup_events_51.json"))
+ self.assertEqual(400, r.status_code)
+
+ self.log.info("Importing events from file does not have batch size limit")
+ self.app.import_events_batch(
+ self.load_events("signup_events_51.json"))
+
+ self.log.info("Individual events may fail when sending events as batch")
+ r = self.app.send_events_batch(
+ self.load_events("partially_malformed_events.json"))
+ self.assertEqual(200, r.status_code)
+ self.assertEqual(201, r.json()[0]['status'])
+ self.assertEqual(400, r.json()[1]['status'])
+
+ # Testing GET for different parameters
+ params = {'event': 'rate'}
+ r = self.app.get_events(params=params)
+ self.assertEqual(20, len(r.json()))
+ self.assertEqual('rate', r.json()[0]['event'])
+
+ params = {
+ 'event': 'rate',
+ 'limit': -1 }
+ r = self.app.get_events(params=params)
+ self.assertEqual(25, len(r.json()))
+ self.assertEqual('rate', r.json()[0]['event'])
+
+ params = {
+ 'event': 'rate',
+ 'limit': 10 }
+ r = self.app.get_events(params=params)
+ self.assertEqual(10, len(r.json()))
+ self.assertEqual('rate', r.json()[0]['event'])
+
+ params = {
+ 'event': 'rate',
+ 'entityType': 'user',
+ 'entityId': '1' }
+ r = self.app.get_events(params=params)
+ self.assertEqual(5, len(r.json()))
+ self.assertEqual('1', r.json()[0]['entityId'])
+
+ params = {
+ 'event': 'rate',
+ 'targetEntityType': 'item',
+ 'targetEntityId': '1' }
+ r = self.app.get_events(params=params)
+ self.assertEqual(5, len(r.json()))
+ self.assertEqual('1', r.json()[0]['targetEntityId'])
+
+ params = {
+ 'event': 'rate',
+ 'entityType': 'user',
+ 'entityId': '1',
+ 'startTime': '2014-11-01T09:39:45.618-08:00',
+ 'untilTime': '2014-11-04T09:39:45.618-08:00' }
+ r = self.app.get_events(params=params)
+ self.assertEqual(3, len(r.json()))
+ self.assertEqual('1', r.json()[0]['entityId'])
+
+ params = {
+ 'event': 'rate',
+ 'entityType': 'user',
+ 'entityId': '1',
+ 'reversed': 'true' }
+ r = self.app.get_events(params=params)
+ self.assertEqual(5, len(r.json()))
+ self.assertEqual('2014-11-05T09:39:45.618-08:00', r.json()[0]['eventTime'])
+
+ def tearDown(self):
+ self.log.info("Deleting all app data")
+ self.app.delete_data()
+ self.log.info("Deleting app")
+ self.app.delete()
+
+
+if __name__ == '__main__':
+ suite = unittest.TestSuite([BasicEventserverTest])
+ result = unittest.TextTestRunner(verbosity=2).run(suite)
+ if not result.wasSuccessful():
+ sys.exit(1)
+
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/5320724a/tests/pio_tests/scenarios/quickstart_test.py
----------------------------------------------------------------------
diff --git a/tests/pio_tests/scenarios/quickstart_test.py b/tests/pio_tests/scenarios/quickstart_test.py
new file mode 100644
index 0000000..a083c2b
--- /dev/null
+++ b/tests/pio_tests/scenarios/quickstart_test.py
@@ -0,0 +1,125 @@
+import os
+import unittest
+import random
+import logging
+from pio_tests.integration import BaseTestCase, AppContext
+from utils import AppEngine, srun, pjoin
+
+def read_events(file_path):
+ RATE_ACTIONS_DELIMITER = "::"
+ with open(file_path, 'r') as f:
+ events = []
+ for line in f:
+ data = line.rstrip('\r\n').split(RATE_ACTIONS_DELIMITER)
+ if random.randint(0, 1) == 1:
+ events.append( {
+ "event": "rate",
+ "entityType": "user",
+ "entityId": data[0],
+ "targetEntityType": "item",
+ "targetEntityId": data[1],
+ "properties": { "rating" : float(data[2]) } })
+ else:
+ events.append({
+ "event": "buy",
+ "entityType": "user",
+ "entityId": data[0],
+ "targetEntityType": "item",
+ "targetEntityId": data[1] })
+
+ return events
+
+
+class QuickStartTest(BaseTestCase):
+
+ def setUp(self):
+ self.log.info("Setting up the engine")
+
+ template_path = pjoin(
+ self.test_context.engine_directory, "recommendation-engine")
+ engine_json_path = pjoin(
+ self.test_context.data_directory, "quickstart_test/engine.json")
+
+ self.training_data_path = pjoin(
+ self.test_context.data_directory,
+ "quickstart_test/training_data.txt")
+
+ # downloading training data
+ srun('curl https://raw.githubusercontent.com/apache/spark/master/' \
+ 'data/mllib/sample_movielens_data.txt --create-dirs -o {}'
+ .format(self.training_data_path))
+
+ app_context = AppContext(
+ name="MyRecommender",
+ template=template_path,
+ engine_json_path=engine_json_path)
+
+ self.app = AppEngine(self.test_context, app_context)
+
+ def runTest(self):
+ self.log.info("Adding a new application")
+ self.app.new()
+
+ event1 = {
+ "event" : "rate",
+ "entityType" : "user",
+ "entityId" : "u0",
+ "targetEntityType" : "item",
+ "targetEntityId" : "i0",
+ "properties" : {
+ "rating" : 5
+ },
+ "eventTime" : "2014-11-02T09:39:45.618-08:00" }
+
+ event2 = {
+ "event" : "buy",
+ "entityType" : "user",
+ "entityId" : "u1",
+ "targetEntityType" : "item",
+ "targetEntityId" : "i2",
+ "eventTime" : "2014-11-10T12:34:56.123-08:00" }
+
+ self.log.info("Sending two test events")
+ self.assertListEqual(
+ [201, 201],
+ [self.app.send_event(e).status_code for e in [event1, event2]])
+
+ self.log.info("Checking the number of events stored on the server")
+ r = self.app.get_events()
+ self.assertEquals(200, r.status_code)
+ stored_events = r.json()
+ self.assertEqual(2, len(stored_events))
+
+ self.log.info("Importing many events")
+ new_events = read_events(self.training_data_path)
+ for ev in new_events:
+ r = self.app.send_event(ev)
+ self.assertEqual(201, r.status_code)
+
+ self.log.info("Checking the number of events stored on the server after the update")
+ r = self.app.get_events(params={'limit': -1})
+ self.assertEquals(200, r.status_code)
+ stored_events = r.json()
+ self.assertEquals(len(new_events) + 2, len(stored_events))
+
+ self.log.info("Building an engine...")
+ self.app.build()
+ self.log.info("Training...")
+ self.app.train()
+ self.log.info("Deploying and waiting 15s for it to start...")
+ self.app.deploy(wait_time=15)
+
+ self.log.info("Sending a single query and checking results")
+ user_query = { "user": 1, "num": 4 }
+ r = self.app.query(user_query)
+ self.assertEqual(200, r.status_code)
+ result = r.json()
+ self.assertEqual(4, len(result['itemScores']))
+
+ def tearDown(self):
+ self.log.info("Stopping deployed engine")
+ self.app.stop()
+ self.log.info("Deleting all related data")
+ self.app.delete_data()
+ self.log.info("Removing an app")
+ self.app.delete()
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/5320724a/tests/pio_tests/tests.py
----------------------------------------------------------------------
diff --git a/tests/pio_tests/tests.py b/tests/pio_tests/tests.py
new file mode 100755
index 0000000..33d9940
--- /dev/null
+++ b/tests/pio_tests/tests.py
@@ -0,0 +1,80 @@
+import os
+import sys
+import unittest
+import argparse
+import logging
+import time
+from xmlrunner import XMLTestRunner
+import pio_tests.globals as globals
+from utils import srun_bg
+from pio_tests.integration import TestContext
+from pio_tests.scenarios.quickstart_test import QuickStartTest
+from pio_tests.scenarios.basic_app_usecases import BasicAppUsecases
+from pio_tests.scenarios.eventserver_test import EventserverTest
+
+parser = argparse.ArgumentParser(description='Integration tests for PredictionIO')
+parser.add_argument('--eventserver-ip', default='0.0.0.0')
+parser.add_argument('--eventserver-port', type=int, default=7070)
+parser.add_argument('--no-shell-stdout', action='store_true',
+ help='Suppress STDOUT output from shell executed commands')
+parser.add_argument('--no-shell-stderr', action='store_true',
+ help='Suppress STDERR output from shell executed commands')
+parser.add_argument('--logging', action='store', choices=['INFO', 'DEBUG', 'NO_LOGGING'],
+ default='INFO', help='Choose the logging level')
+parser.add_argument('--tests', nargs='*', type=str,
+ default=None, help='Names of the tests to execute. By default all tests will be checked')
+
+TESTS_DIRECTORY = os.path.abspath(os.path.dirname(__file__))
+ENGINE_DIRECTORY = os.path.join(TESTS_DIRECTORY, "engines")
+DATA_DIRECTORY = os.path.join(TESTS_DIRECTORY, "data")
+
+LOGGING_FORMAT = '[%(levelname)s] %(module)s %(asctime)-15s: %(message)s'
+logging.basicConfig(format=LOGGING_FORMAT)
+
+def get_tests(test_context):
+ # ========= ADD TESTS HERE!!! ================================
+ return {'QuickStart': QuickStartTest(test_context),
+ 'BasicAppUsecases': BasicAppUsecases(test_context),
+ 'EventserverTest': EventserverTest(test_context)}
+
+if __name__ == "__main__":
+ args = vars(parser.parse_args())
+
+ if args.get('no_shell_stdout'):
+ globals.SUPPRESS_STDOUT = True
+ if args.get('no_shell_stderr'):
+ globals.SUPPRESS_STDERR = True
+
+ # setting up logging
+ log_opt = args['logging']
+ logger = logging.getLogger(globals.LOGGER_NAME)
+ if log_opt == 'INFO':
+ logger.level = logging.INFO
+ elif log_opt == 'DEBUG':
+ logger.level = logging.DEBUG
+
+ test_context = TestContext(
+ ENGINE_DIRECTORY, DATA_DIRECTORY,
+ args['eventserver_ip'], int(args['eventserver_port']))
+
+ tests_dict = get_tests(test_context)
+ test_names = args['tests']
+ tests = []
+ if test_names is not None:
+ tests = [t for name, t in tests_dict.items() if name in test_names]
+ else:
+ tests = tests_dict.values()
+
+ # Actual tests execution
+ es_wait_time = 25
+ logger.info("Starting eventserver and waiting {}s for it to initialize".format(
+ es_wait_time))
+ event_server_process = srun_bg('pio eventserver --ip {} --port {}'
+ .format(test_context.es_ip, test_context.es_port))
+ time.sleep(es_wait_time)
+ result = XMLTestRunner(verbosity=2, output='test-reports').run(
+ unittest.TestSuite(tests))
+ event_server_process.kill()
+
+ if not result.wasSuccessful():
+ sys.exit(1)
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/5320724a/tests/pio_tests/utils.py
----------------------------------------------------------------------
diff --git a/tests/pio_tests/utils.py b/tests/pio_tests/utils.py
new file mode 100644
index 0000000..629729e
--- /dev/null
+++ b/tests/pio_tests/utils.py
@@ -0,0 +1,309 @@
+import re
+import time
+import os
+import requests
+import json
+from shutil import copyfile
+from subprocess import run, Popen, check_output
+from os.path import join as pjoin
+import pio_tests.globals as globals
+
+def srun(command):
+ """ Runs a shell command given as a `str`
+ Raises: `subprocess.CalledProcessError` when exit code != 0
+ """
+ return run(command, shell=True, stdout=globals.std_out(),
+ stderr=globals.std_err(), check=True)
+
+def srun_out(command):
+ """ Runs a shell command given as a `str`
+ Returns: string with command's output
+ Raises: `subprocess.CalledProcessError` when exit code != 0
+ """
+ return check_output(command, shell=True, universal_newlines=True,
+ stderr=globals.std_err())
+
+def srun_bg(command):
+ """ Runs a shell command given as a `str` in the background
+ Returns: (obj: `subprocess.Popen`) for executed process
+ """
+ return Popen(command, shell=True, stdout=globals.std_out(),
+ stderr=globals.std_err())
+
+def repository_dirname(template):
+ """ Utility function getting repository name from the link
+ Example: for "https://github.com/user/SomeRepo" should return "SomeRepo"
+ """
+ return template.split('/')[-1]
+
+def obtain_template(engine_dir, template):
+ """Given a directory with engines and a template downloads an engine
+ if neccessary
+ Args:
+ engine_dir (str): directory where engines are stored
+ template (str): either the name of an engine from the engines directory
+ or a link to repository with the engine
+ Returns: str with the engine's path
+ """
+ if re.match('^https?:\/\/', template):
+ dest_dir = pjoin(engine_dir, repository_dirname(template))
+ if not os.path.exists(dest_dir):
+ srun('git clone --depth=1 {0} {1}'.format(template, dest_dir))
+ return dest_dir
+ else:
+ # check if exists
+ dest_dir = pjoin(engine_dir, template)
+ if not os.path.exists(dest_dir):
+ raise ValueError('Engine {0} does not exist in {1}'
+ .format(template, engine_dir))
+
+ return dest_dir
+
+def pio_app_list():
+ """Returns: a list of dicts for every application with the following keys:
+ `name`, `id`, `access_key`, `allowed_events`
+ """
+ output = srun_out('pio app list').rstrip()
+ return [ { 'name': line[2], 'id': int(line[4]),
+ 'access_key': line[6], 'allowed_events': line[8] }
+ for line in [x.split() for x in output.split('\n')[1:-1]] ]
+
+def get_app_eventserver_url_json(test_context):
+ return 'http://{}:{}/events.json'.format(
+ test_context.es_ip, test_context.es_port)
+
+def get_engine_url_json(engine_ip, engine_port):
+ return 'http://{}:{}/queries.json'.format(
+ engine_ip, engine_port)
+
+def send_event(event, test_context, access_key, channel=None):
+ """ Sends an event to the eventserver
+ Args:
+ event: json-like dictionary describing an event
+ test_context (obj: `TestContext`):
+ access_key: application's access key
+ channel (str): custom channel for storing event
+ Returns: `requests.Response`
+ """
+ url = get_app_eventserver_url_json(test_context)
+ params = { 'accessKey': access_key }
+ if channel: params['channel'] = channel
+ return requests.post(
+ url,
+ params=params,
+ json=event)
+
+def send_events_batch(events, test_context, access_key, channel=None):
+ """ Send events in batch via REST to the eventserver
+ Args:
+ events: a list of json-like dictionaries for events
+ test_context (obj: `TestContext`):
+ access_key: application's access key
+ channel (str): custom channel for storing event
+ Returns: `requests.Response`
+ Requires: Events length must not exceed length of 50
+ http://docs.prediction.io/datacollection/eventmodel/#3.-batch-events-to-the-eventserver
+ """
+ url = 'http://{}:{}/batch/events.json'.format(
+ test_context.es_ip, test_context.es_port)
+ params = { 'accessKey': access_key }
+ if channel: params['channel'] = channel
+ return requests.post(
+ url,
+ params=params,
+ json=events)
+
+
+def import_events_batch(events, test_context, appid, channel=None):
+ """ Imports events in batch from file with `pio import`
+ Args:
+ events: a list of json-like dictionaries for events
+ test_context (obj: `TestContext`)
+ appid (int): application's id
+ channel (str): custom channel for storing event
+ """
+ # Writing events list to temporary file.
+ # `pio import` requires each line of input file to be a JSON string
+ # representing an event. Empty lines are not allowed.
+ contents = ''
+ for ev in events:
+ contents += '{}\n'.format(json.dumps(ev))
+ contents.rstrip('\n')
+
+ file_path = pjoin(test_context.data_directory, 'events.json.tmp')
+ try:
+ with open(file_path, 'w') as f:
+ f.write(contents)
+ srun('pio import --appid {} --input {} {}'.format(
+ appid,
+ file_path,
+ '--channel {}'.format(channel) if channel else ''))
+ finally:
+ os.remove(file_path)
+
+def get_events(test_context, access_key, params={}):
+ """ Gets events for some application
+ Args:
+ test_context (obj: `TestContext`)
+ access_key (str):
+ params (dict): special parameters for eventserver's GET, e.g:
+ 'limit', 'reversed', 'event'. See the docs
+ Returns: `requests.Response`
+ """
+ url = get_app_eventserver_url_json(test_context)
+ return requests.get(url, params=dict({'accessKey': access_key}, **params))
+
+def query_engine(data, engine_ip='localhost', engine_port=8000):
+ """ Send a query to deployed engine
+ Args:
+ data (dict): json-like dictionary being an input to an engine
+ access_key (str):
+ engine_ip (str): ip of deployed engine
+ engine_port (int): port of deployed engine
+ Returns: `requests.Response`
+ """
+ url = get_engine_url_json(engine_ip, engine_port)
+ return requests.post(url, json=data)
+
+class AppEngine:
+ """ This is a utility class simplifying all app related interactions.
+ Basically it is just a wrapper on other utility functions and shell
+ scripts.
+ """
+
+ def __init__(self, test_context, app_context, already_created=False):
+ """ Args:
+ test_context (obj: `TestContext`)
+ app_context (obj: `AppContext`)
+ already_created (bool): True if the given app has been already added
+ """
+ self.test_context = test_context
+ self.app_context = app_context
+ self.engine_path = obtain_template(
+ self.test_context.engine_directory, app_context.template)
+ self.deployed_process = None
+ if already_created:
+ self.__init_info()
+ else:
+ self.id = None
+ self.access_key = None
+ self.description = None
+
+ if self.app_context.engine_json_path:
+ self.__copy_engine_json()
+
+ def __copy_engine_json(self):
+ to_path = pjoin(self.engine_path, 'engine.json')
+ copyfile(self.app_context.engine_json_path, to_path)
+
+ def __init_info(self):
+ info = self.show()
+ self.id = info['id']
+ self.access_key = info['access_key']
+ self.description = info['description']
+
+ def new(self, id=None, description=None, access_key=None):
+ """ Creates a new application with given parameters """
+ srun('pio app new {} {} {} {}'.format(
+ '--id {}'.format(id) if id else '',
+ '--description \"{}\"'.format(description) if description else '',
+ '--access-key {}'.format(access_key) if access_key else '',
+ self.app_context.name))
+
+ self.__init_info()
+
+
+ def show(self):
+ """ Returns: application info in dictionary with the keys:
+ `name`: str, `id`: int, `description`: str,
+ `access_key`: str, `allowed_events`: str
+ """
+ output = srun_out('pio app show {}'.format(self.app_context.name)).rstrip()
+ lines = [x.split() for x in output.split('\n')]
+ return { 'name': lines[0][3],
+ 'id': int(lines[1][4]),
+ 'description': lines[2][3] if len(lines[2]) >= 4 else '',
+ 'access_key': lines[3][4],
+ 'allowed_events': lines[3][5] }
+
+
+ # deletes this app from pio
+ def delete(self):
+ srun('pio app delete {0} --force'.format(self.app_context.name))
+
+ def build(self, sbt_extra=None, clean=False, no_asm=True):
+ srun('cd {0}; pio build {1} {2} {3}'.format(
+ self.engine_path,
+ '--sbt-extra {}'.format(sbt_extra) if sbt_extra else '',
+ '--clean' if clean else '',
+ '--no-asm' if no_asm else ''))
+
+ def train(self, batch=None, skip_sanity_check=False, stop_after_read=False,
+ stop_after_prepare=False, engine_factory=None,
+ engine_params_key=None, scratch_uri=None):
+
+ srun('cd {}; pio train {} {} {} {} {} {} {}'.format(
+ self.engine_path,
+ '--batch {}'.format(batch) if batch else '',
+ '--skip-sanity-check' if skip_sanity_check else '',
+ '--stop-after-read' if stop_after_read else '',
+ '--stop-after-prepare' if stop_after_prepare else '',
+ '--engine_factory {}'.format(engine_factory) if engine_factory else '',
+ '--engine-params-key {}'.format(engine_params_key) if engine_params_key else '',
+ '--scratch-uri {}'.format(scratch_uri) if scratch_uri else ''))
+
+ def deploy(self, wait_time=0, ip=None, port=None, engine_instance_id=None,
+ feedback=False, accesskey=None, event_server_ip=None, event_server_port=None,
+ batch=None, scratch_uri=None):
+
+ command = 'cd {}; pio deploy {} {} {} {} {} {} {} {} {}'.format(
+ self.engine_path,
+ '--ip {}'.format(ip) if ip else '',
+ '--port {}'.format(port) if port else '',
+ '--engine-instance-id {}'.format(engine_instance_id) if engine_instance_id else '',
+ '--feedback' if feedback else '',
+ '--accesskey {}'.format(accesskey) if accesskey else '',
+ '--event-server-ip {}'.format(event_server_ip) if event_server_ip else '',
+ '--event-server-port {}'.format(event_server_port) if event_server_port else '',
+ '--batch {}'.format(bach) if batch else '',
+ '--scratch-uri {}'.format(scratch_uri) if scratch_uri else '')
+
+ self.deployed_process = srun_bg(command)
+ time.sleep(wait_time)
+ if self.deployed_process.poll() is not None:
+ raise Exception('Application engine terminated')
+ self.ip = ip if ip else 'localhost'
+ self.port = port if port else 8000
+
+ def stop(self):
+ """ Kills deployed engine """
+ if self.deployed_process:
+ self.deployed_process.kill()
+
+ def new_channel(self, channel):
+ srun('pio app channel-new {0}'.format(channel))
+
+ def delete_channel(self, channel):
+ srun('pio app channel-delete {0} --force'.format(channel))
+
+ def send_event(self, event):
+ return send_event(event, self.test_context, self.access_key)
+
+ def send_events_batch(self, events):
+ return send_events_batch(events, self.test_context, self.access_key)
+
+ def import_events_batch(self, events):
+ return import_events_batch(events, self.test_context, self.id)
+
+ def get_events(self, params={}):
+ return get_events(self.test_context, self.access_key, params)
+
+ def delete_data(self, delete_all=True, channel=None):
+ srun('pio app data-delete {0} {1} {2} --force'
+ .format(
+ self.app_context.name,
+ '--all' if delete_all else '',
+ '--channel ' + channel if channel is not None else ''))
+
+ def query(self, data):
+ return query_engine(data, self.ip, self.port)
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/5320724a/tests/run_docker.sh
----------------------------------------------------------------------
diff --git a/tests/run_docker.sh b/tests/run_docker.sh
new file mode 100755
index 0000000..d5925ef
--- /dev/null
+++ b/tests/run_docker.sh
@@ -0,0 +1,39 @@
+#!/bin/bash -
+
+USAGE=$"Usage: run_docer <meta> <event> <model> <pio> <command>
+ Where:
+ meta = [PGSQL,ELASTICSEARCH]
+ event = [PGSQL,HBASE]
+ model = [PGSQL,LOCALFS,HDFS]
+ pio = path to PredictionIO directory
+ command = command to run in the container"
+
+if ! [[ "$1" =~ ^(PGSQL|ELASTICSEARCH)$ ]]; then
+ echo "$USAGE"
+ exit 1
+fi
+
+if ! [[ "$2" =~ ^(PGSQL|HBASE)$ ]]; then
+ echo "$USAGE"
+ exit 1
+fi
+
+if ! [[ "$3" =~ ^(PGSQL|LOCALFS|HDFS)$ ]]; then
+ echo "$USAGE"
+ exit 1
+fi
+
+if [ ! -d "$4" ]; then
+ echo "Directory $4 does not exist"
+ echo "$USAGE"
+ exit 1
+fi
+
+docker run -it -h localhost \
+ -v $4:/pio_host \
+ -v ~/.ivy2:/root/.ivy2 \
+ -e PIO_STORAGE_REPOSITORIES_METADATA_SOURCE=$1 \
+ -e PIO_STORAGE_REPOSITORIES_EVENTDATA_SOURCE=$2 \
+ -e PIO_STORAGE_REPOSITORIES_MODELDATA_SOURCE=$3 \
+ -p 8000:8000 -p 7070:7070 -p 8080:8080 -p 8081:8081 -p 4040:4040 \
+ -p 60000:60000 -p 60010:60010 -p 60020:60020 -p 60030:60030 ziemin/pio-testing $5
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/5320724a/tests/script.travis.sh
----------------------------------------------------------------------
diff --git a/tests/script.travis.sh b/tests/script.travis.sh
new file mode 100755
index 0000000..8596078
--- /dev/null
+++ b/tests/script.travis.sh
@@ -0,0 +1,21 @@
+#!/bin/bash -
+
+set -e
+
+if [[ $BUILD_TYPE == Unit ]]; then
+ # Prepare pio environment variables
+ set -a
+ source conf/pio-env.sh.travis
+ set +a
+
+ # Run stylecheck
+ sbt scalastyle
+ # Run all unit tests
+ sbt test
+
+else
+ REPO=`pwd`
+
+ ./tests/run_docker.sh $METADATA_REP $EVENTDATA_REP $MODELDATA_REP \
+ $REPO 'python3 /tests/pio_tests/tests.py'
+fi