You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2016/07/18 20:17:50 UTC
[19/34] incubator-predictionio git commit: rename all except examples
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/workflow/WorkflowUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/workflow/WorkflowUtils.scala b/core/src/main/scala/org/apache/predictionio/workflow/WorkflowUtils.scala
new file mode 100644
index 0000000..e26b754
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/workflow/WorkflowUtils.scala
@@ -0,0 +1,419 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.workflow
+
+import java.io.File
+import java.io.FileNotFoundException
+
+import org.apache.predictionio.controller.EmptyParams
+import org.apache.predictionio.controller.EngineFactory
+import org.apache.predictionio.controller.EngineParamsGenerator
+import org.apache.predictionio.controller.Evaluation
+import org.apache.predictionio.controller.Params
+import org.apache.predictionio.controller.PersistentModelLoader
+import org.apache.predictionio.controller.Utils
+import org.apache.predictionio.core.BuildInfo
+
+import com.google.gson.Gson
+import com.google.gson.JsonSyntaxException
+import grizzled.slf4j.Logging
+import org.apache.predictionio.workflow.JsonExtractorOption.JsonExtractorOption
+import org.apache.log4j.Level
+import org.apache.log4j.LogManager
+import org.apache.spark.SparkContext
+import org.apache.spark.api.java.JavaRDDLike
+import org.apache.spark.rdd.RDD
+import org.json4s.JsonAST.JValue
+import org.json4s.MappingException
+import org.json4s._
+import org.json4s.native.JsonMethods._
+
+import scala.io.Source
+import scala.language.existentials
+import scala.reflect.runtime.universe
+
+/** Collection of reusable workflow related utilities. */
+object WorkflowUtils extends Logging {
+ @transient private lazy val gson = new Gson
+
+ /** Obtains an Engine object in Scala, or instantiate an Engine in Java.
+ *
+ * @param engine Engine factory name.
+ * @param cl A Java ClassLoader to look for engine-related classes.
+ *
+ * @throws ClassNotFoundException
+ * Thrown when engine factory class does not exist.
+ * @throws NoSuchMethodException
+ * Thrown when engine factory's apply() method is not implemented.
+ */
+ def getEngine(engine: String, cl: ClassLoader): (EngineLanguage.Value, EngineFactory) = {
+ val runtimeMirror = universe.runtimeMirror(cl)
+ val engineModule = runtimeMirror.staticModule(engine)
+ val engineObject = runtimeMirror.reflectModule(engineModule)
+ try {
+ (
+ EngineLanguage.Scala,
+ engineObject.instance.asInstanceOf[EngineFactory]
+ )
+ } catch {
+ case e @ (_: NoSuchFieldException | _: ClassNotFoundException) => try {
+ (
+ EngineLanguage.Java,
+ Class.forName(engine).newInstance.asInstanceOf[EngineFactory]
+ )
+ }
+ }
+ }
+
+ def getEngineParamsGenerator(epg: String, cl: ClassLoader):
+ (EngineLanguage.Value, EngineParamsGenerator) = {
+ val runtimeMirror = universe.runtimeMirror(cl)
+ val epgModule = runtimeMirror.staticModule(epg)
+ val epgObject = runtimeMirror.reflectModule(epgModule)
+ try {
+ (
+ EngineLanguage.Scala,
+ epgObject.instance.asInstanceOf[EngineParamsGenerator]
+ )
+ } catch {
+ case e @ (_: NoSuchFieldException | _: ClassNotFoundException) => try {
+ (
+ EngineLanguage.Java,
+ Class.forName(epg).newInstance.asInstanceOf[EngineParamsGenerator]
+ )
+ }
+ }
+ }
+
+ def getEvaluation(evaluation: String, cl: ClassLoader): (EngineLanguage.Value, Evaluation) = {
+ val runtimeMirror = universe.runtimeMirror(cl)
+ val evaluationModule = runtimeMirror.staticModule(evaluation)
+ val evaluationObject = runtimeMirror.reflectModule(evaluationModule)
+ try {
+ (
+ EngineLanguage.Scala,
+ evaluationObject.instance.asInstanceOf[Evaluation]
+ )
+ } catch {
+ case e @ (_: NoSuchFieldException | _: ClassNotFoundException) => try {
+ (
+ EngineLanguage.Java,
+ Class.forName(evaluation).newInstance.asInstanceOf[Evaluation]
+ )
+ }
+ }
+ }
+
+ /** Converts a JSON document to an instance of Params.
+ *
+ * @param language Engine's programming language.
+ * @param json JSON document.
+ * @param clazz Class of the component that is going to receive the resulting
+ * Params instance as a constructor argument.
+ * @param jsonExtractor JSON extractor option.
+ * @param formats JSON4S serializers for deserialization.
+ *
+ * @throws MappingException Thrown when JSON4S fails to perform conversion.
+ * @throws JsonSyntaxException Thrown when GSON fails to perform conversion.
+ */
+ def extractParams(
+ language: EngineLanguage.Value = EngineLanguage.Scala,
+ json: String,
+ clazz: Class[_],
+ jsonExtractor: JsonExtractorOption,
+ formats: Formats = Utils.json4sDefaultFormats): Params = {
+ implicit val f = formats
+ val pClass = clazz.getConstructors.head.getParameterTypes
+ if (pClass.size == 0) {
+ if (json != "") {
+ warn(s"Non-empty parameters supplied to ${clazz.getName}, but its " +
+ "constructor does not accept any arguments. Stubbing with empty " +
+ "parameters.")
+ }
+ EmptyParams()
+ } else {
+ val apClass = pClass.head
+ try {
+ JsonExtractor.extract(jsonExtractor, json, apClass, f).asInstanceOf[Params]
+ } catch {
+ case e@(_: MappingException | _: JsonSyntaxException) =>
+ error(
+ s"Unable to extract parameters for ${apClass.getName} from " +
+ s"JSON string: $json. Aborting workflow.",
+ e)
+ throw e
+ }
+ }
+ }
+
+ def getParamsFromJsonByFieldAndClass(
+ variantJson: JValue,
+ field: String,
+ classMap: Map[String, Class[_]],
+ engineLanguage: EngineLanguage.Value,
+ jsonExtractor: JsonExtractorOption): (String, Params) = {
+ variantJson findField {
+ case JField(f, _) => f == field
+ case _ => false
+ } map { jv =>
+ implicit lazy val formats = Utils.json4sDefaultFormats + new NameParamsSerializer
+ val np: NameParams = try {
+ jv._2.extract[NameParams]
+ } catch {
+ case e: Exception =>
+ error(s"Unable to extract $field name and params $jv")
+ throw e
+ }
+ val extractedParams = np.params.map { p =>
+ try {
+ if (!classMap.contains(np.name)) {
+ error(s"Unable to find $field class with name '${np.name}'" +
+ " defined in Engine.")
+ sys.exit(1)
+ }
+ WorkflowUtils.extractParams(
+ engineLanguage,
+ compact(render(p)),
+ classMap(np.name),
+ jsonExtractor,
+ formats)
+ } catch {
+ case e: Exception =>
+ error(s"Unable to extract $field params $p")
+ throw e
+ }
+ }.getOrElse(EmptyParams())
+
+ (np.name, extractedParams)
+ } getOrElse("", EmptyParams())
+ }
+
+ /** Grab environmental variables that starts with 'PIO_'. */
+ def pioEnvVars: Map[String, String] =
+ sys.env.filter(kv => kv._1.startsWith("PIO_"))
+
+ /** Converts Java (non-Scala) objects to a JSON4S JValue.
+ *
+ * @param params The Java object to be converted.
+ */
+ def javaObjectToJValue(params: AnyRef): JValue = parse(gson.toJson(params))
+
+ private[prediction] def checkUpgrade(
+ component: String = "core",
+ engine: String = ""): Unit = {
+ val runner = new Thread(new UpgradeCheckRunner(component, engine))
+ runner.start()
+ }
+
+ // Extract debug string by recursively traversing the data.
+ def debugString[D](data: D): String = {
+ val s: String = data match {
+ case rdd: RDD[_] => {
+ debugString(rdd.collect())
+ }
+ case javaRdd: JavaRDDLike[_, _] => {
+ debugString(javaRdd.collect())
+ }
+ case array: Array[_] => {
+ "[" + array.map(debugString).mkString(",") + "]"
+ }
+ case d: AnyRef => {
+ d.toString
+ }
+ case null => "null"
+ }
+ s
+ }
+
+ /** Detect third party software configuration files to be submitted as
+ * extras to Apache Spark. This makes sure all executors receive the same
+ * configuration.
+ */
+ def thirdPartyConfFiles: Seq[String] = {
+ val thirdPartyFiles = Map(
+ "PIO_CONF_DIR" -> "log4j.properties",
+ "ES_CONF_DIR" -> "elasticsearch.yml",
+ "HADOOP_CONF_DIR" -> "core-site.xml",
+ "HBASE_CONF_DIR" -> "hbase-site.xml")
+
+ thirdPartyFiles.keys.toSeq.map { k: String =>
+ sys.env.get(k) map { x =>
+ val p = Seq(x, thirdPartyFiles(k)).mkString(File.separator)
+ if (new File(p).exists) Seq(p) else Seq[String]()
+ } getOrElse Seq[String]()
+ }.flatten
+ }
+
+ def thirdPartyClasspaths: Seq[String] = {
+ val thirdPartyPaths = Seq(
+ "PIO_CONF_DIR",
+ "ES_CONF_DIR",
+ "POSTGRES_JDBC_DRIVER",
+ "MYSQL_JDBC_DRIVER",
+ "HADOOP_CONF_DIR",
+ "HBASE_CONF_DIR")
+ thirdPartyPaths.map(p =>
+ sys.env.get(p).map(Seq(_)).getOrElse(Seq[String]())
+ ).flatten
+ }
+
+ def modifyLogging(verbose: Boolean): Unit = {
+ val rootLoggerLevel = if (verbose) Level.TRACE else Level.INFO
+ val chattyLoggerLevel = if (verbose) Level.INFO else Level.WARN
+
+ LogManager.getRootLogger.setLevel(rootLoggerLevel)
+
+ LogManager.getLogger("org.elasticsearch").setLevel(chattyLoggerLevel)
+ LogManager.getLogger("org.apache.hadoop").setLevel(chattyLoggerLevel)
+ LogManager.getLogger("org.apache.spark").setLevel(chattyLoggerLevel)
+ LogManager.getLogger("org.eclipse.jetty").setLevel(chattyLoggerLevel)
+ LogManager.getLogger("akka").setLevel(chattyLoggerLevel)
+ }
+
+ def extractNameParams(jv: JValue): NameParams = {
+ implicit val formats = Utils.json4sDefaultFormats
+ val nameOpt = (jv \ "name").extract[Option[String]]
+ val paramsOpt = (jv \ "params").extract[Option[JValue]]
+
+ if (nameOpt.isEmpty && paramsOpt.isEmpty) {
+ error("Unable to find 'name' or 'params' fields in" +
+ s" ${compact(render(jv))}.\n" +
+ "Since 0.8.4, the 'params' field is required in engine.json" +
+ " in order to specify parameters for DataSource, Preparator or" +
+ " Serving.\n" +
+ "Please go to https://docs.prediction.io/resources/upgrade/" +
+ " for detailed instruction of how to change engine.json.")
+ sys.exit(1)
+ }
+
+ if (nameOpt.isEmpty) {
+ info(s"No 'name' is found. Default empty String will be used.")
+ }
+
+ if (paramsOpt.isEmpty) {
+ info(s"No 'params' is found. Default EmptyParams will be used.")
+ }
+
+ NameParams(
+ name = nameOpt.getOrElse(""),
+ params = paramsOpt
+ )
+ }
+
+ def extractSparkConf(root: JValue): List[(String, String)] = {
+ def flatten(jv: JValue): List[(List[String], String)] = {
+ jv match {
+ case JObject(fields) =>
+ for ((namePrefix, childJV) <- fields;
+ (name, value) <- flatten(childJV))
+ yield (namePrefix :: name) -> value
+ case JArray(_) => {
+ error("Arrays are not allowed in the sparkConf section of engine.js.")
+ sys.exit(1)
+ }
+ case JNothing => List()
+ case _ => List(List() -> jv.values.toString)
+ }
+ }
+
+ flatten(root \ "sparkConf").map(x =>
+ (x._1.reduce((a, b) => s"$a.$b"), x._2))
+ }
+}
+
+case class NameParams(name: String, params: Option[JValue])
+
+class NameParamsSerializer extends CustomSerializer[NameParams](format => ( {
+ case jv: JValue => WorkflowUtils.extractNameParams(jv)
+}, {
+ case x: NameParams =>
+ JObject(JField("name", JString(x.name)) ::
+ JField("params", x.params.getOrElse(JNothing)) :: Nil)
+}
+ ))
+
+/** Collection of reusable workflow related utilities that touch on Apache
+ * Spark. They are separated to avoid compilation problems with certain code.
+ */
+object SparkWorkflowUtils extends Logging {
+ def getPersistentModel[AP <: Params, M](
+ pmm: PersistentModelManifest,
+ runId: String,
+ params: AP,
+ sc: Option[SparkContext],
+ cl: ClassLoader): M = {
+ val runtimeMirror = universe.runtimeMirror(cl)
+ val pmmModule = runtimeMirror.staticModule(pmm.className)
+ val pmmObject = runtimeMirror.reflectModule(pmmModule)
+ try {
+ pmmObject.instance.asInstanceOf[PersistentModelLoader[AP, M]](
+ runId,
+ params,
+ sc)
+ } catch {
+ case e @ (_: NoSuchFieldException | _: ClassNotFoundException) => try {
+ val loadMethod = Class.forName(pmm.className).getMethod(
+ "load",
+ classOf[String],
+ classOf[Params],
+ classOf[SparkContext])
+ loadMethod.invoke(null, runId, params, sc.orNull).asInstanceOf[M]
+ } catch {
+ case e: ClassNotFoundException =>
+ error(s"Model class ${pmm.className} cannot be found.")
+ throw e
+ case e: NoSuchMethodException =>
+ error(
+ "The load(String, Params, SparkContext) method cannot be found.")
+ throw e
+ }
+ }
+ }
+}
+
+class UpgradeCheckRunner(
+ val component: String,
+ val engine: String) extends Runnable with Logging {
+ val version = BuildInfo.version
+ val versionsHost = "https://direct.prediction.io/"
+
+ def run(): Unit = {
+ val url = if (engine == "") {
+ s"$versionsHost$version/$component.json"
+ } else {
+ s"$versionsHost$version/$component/$engine.json"
+ }
+ try {
+ val upgradeData = Source.fromURL(url)
+ } catch {
+ case e: FileNotFoundException =>
+ debug(s"Update metainfo not found. $url")
+ case e: java.net.UnknownHostException =>
+ debug(s"${e.getClass.getName}: {e.getMessage}")
+ }
+ // TODO: Implement upgrade logic
+ }
+}
+
+class WorkflowInterruption() extends Exception
+
+case class StopAfterReadInterruption() extends WorkflowInterruption
+
+case class StopAfterPrepareInterruption() extends WorkflowInterruption
+
+object EngineLanguage extends Enumeration {
+ val Scala, Java = Value
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/twirl/io/prediction/controller/metric_evaluator.scala.html
----------------------------------------------------------------------
diff --git a/core/src/main/twirl/io/prediction/controller/metric_evaluator.scala.html b/core/src/main/twirl/io/prediction/controller/metric_evaluator.scala.html
deleted file mode 100644
index 2e679a5..0000000
--- a/core/src/main/twirl/io/prediction/controller/metric_evaluator.scala.html
+++ /dev/null
@@ -1,95 +0,0 @@
-<html>
- <head>
- <script type='text/javascript' src='https://www.google.com/jsapi'></script>
- <script src="http://ajax.googleapis.com/ajax/libs/jquery/2.1.1/jquery.min.js"></script>
- <script>
- google.load('visualization', '1', {packages:['table', 'corechart',]});
- </script>
- </head>
- <body>
- <h1>Metric Evaluator</h1>
- <div id='debug'></div>
- <div id='table'>
- <h3>Engine Params Evaluation Results</h3>
- <div>Click on table to view the engine params</div>
- </div>
- <pre id='engineParams'></div>
- <script type='text/javascript'>
- google.setOnLoadCallback(load);
-
- //var url = 'http://localhost:9000/engine_instances/ky01Q-glQheNE_s885JTSg/local_evaluator_results.json';
- var url = 'evaluator_results.json';
- var rawData;
- var metricHeader;
- var otherMetricHeaders;
- var engineParamsScores;
- var table;
- var dataTable;
-
- function load() {
- rawData = JSON.parse(
- jQuery.ajax({
- url: url,
- dataType: 'json',
- async: false,
- }).responseText);
-
- metricHeader = rawData['metricHeader'];
- otherMetricHeaders = rawData['otherMetricHeaders'];
- engineParamsScores = rawData['engineParamsScores'];
-
- drawTable();
- }
-
- function tableSelectHandler() {
- var selection = table.getSelection();
- if (selection.length > 0) {
- var row = selection[0].row;
- var idx = dataTable.getValue(row, 0);
- var engineParams = engineParamsScores[idx]._1;
-
- document.getElementById('engineParams').innerHTML = JSON.stringify(
- engineParams, null, 2);
- } else {
- document.getElementById('engineParams').innerHTML = "";
- }
- }
-
- function drawTable() {
- var tableDiv = document.createElement('div');
- document.getElementById('table').appendChild(tableDiv);
-
-
- var dataArray = [];
-
- var headers = ['Index', 'Best', metricHeader].concat(otherMetricHeaders);
- dataArray.push(headers);
-
- for (epIdx = 0; epIdx < engineParamsScores.length; epIdx++) {
- var epScore = engineParamsScores[epIdx];
- var isBest = (epIdx == rawData.bestIdx ? "*" : "");
- dataArray.push([epIdx, isBest, epScore._2.score].concat(epScore._2.otherScores));
- }
-
- dataTable = google.visualization.arrayToDataTable(dataArray, false);
-
- // formatter
- var numberFormatter = new google.visualization.NumberFormat({fractionDigits: 4});
-
- for (colIdx = 1; colIdx < dataTable.getNumberOfColumns(); colIdx++) {
- if (dataTable.getColumnType(colIdx) == "number") {
- numberFormatter.format(dataTable, colIdx);
- }
- }
-
- table = new google.visualization.Table(tableDiv);
-
- // select handler
- google.visualization.events.addListener(table, 'select', tableSelectHandler);
-
- table.draw(dataTable);
- }
-
- </script>
- </body>
-</html>
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/twirl/io/prediction/workflow/index.scala.html
----------------------------------------------------------------------
diff --git a/core/src/main/twirl/io/prediction/workflow/index.scala.html b/core/src/main/twirl/io/prediction/workflow/index.scala.html
deleted file mode 100644
index 4e0707b..0000000
--- a/core/src/main/twirl/io/prediction/workflow/index.scala.html
+++ /dev/null
@@ -1,92 +0,0 @@
-@import io.prediction.data.storage.EngineInstance
-@import io.prediction.data.storage.EngineManifest
-@import io.prediction.workflow.ServerConfig
-@import org.joda.time.DateTime
-@import org.joda.time.format.DateTimeFormat
-@(args: ServerConfig,
- manifest: EngineManifest,
- engineInstance: EngineInstance,
- algorithms: Seq[String],
- algorithmsParams: Seq[String],
- models: Seq[String],
- dataSourceParams: String,
- preparatorParams: String,
- servingParams: String,
- serverStartTime: DateTime,
- feedback: Boolean,
- eventServerIp: String,
- eventServerPort: Int,
- requestCount: Int,
- avgServingSec: Double,
- lastServingSec: Double
- )
-<!DOCTYPE html>
-<html lang="en">
- <head>
- <title>@{engineInstance.engineFactory} (@{engineInstance.engineVariant}) - PredictionIO Engine Server at @{args.ip}:@{args.port}</title>
- <link href="/assets/bootstrap-3.2.0-dist/css/bootstrap.min.css" rel="stylesheet">
- <style type="text/css">
- td { font-family: Menlo, Monaco, Consolas, "Courier New", monospace; }
- </style>
- </head>
- <body>
- <div class="container">
- <div class="page-header">
- <h1>PredictionIO Engine Server at @{args.ip}:@{args.port}</h1>
- <p class="lead">@{engineInstance.engineFactory} (@{engineInstance.engineVariant})</p>
- </div>
- <h2>Engine Information</h2>
- <table class="table table-bordered table-striped">
- <tr><th>Training Start Time</th><td>@{DateTimeFormat.forStyle("FF").print(engineInstance.startTime)}</td></tr>
- <tr><th>Training End Time</th><td>@{DateTimeFormat.forStyle("FF").print(engineInstance.endTime)}</td></tr>
- <tr><th>Variant ID</th><td>@{engineInstance.engineVariant}</td></tr>
- <tr><th>Instance ID</th><td>@{engineInstance.id}</td></tr>
- </table>
- <h2>Server Information</h2>
- <table class="table table-bordered table-striped">
- <tr><th>Start Time</th><td>@{DateTimeFormat.forStyle("FF").print(serverStartTime)}</td></tr>
- <tr><th>Request Count</th><td>@{requestCount}</td></tr>
- <tr><th>Average Serving Time</th><td>@{f"${avgServingSec}%.4f"} seconds</td></tr>
- <tr><th>Last Serving Time</th><td>@{f"${lastServingSec}%.4f"} seconds</td></tr>
- <tr><th>Engine Factory Class (Scala/Java)</th><td>@{engineInstance.engineFactory}</td></tr>
- <tr>
- <th rowspan="@(manifest.files.size)">Library Files</th>
- <td>@{manifest.files.head}</td>
- </tr>
- @for(f <- manifest.files.drop(1)) {
- <tr><td>@f</td></tr>
- }
- </table>
- <h2>Data Source</h2>
- <table class="table table-bordered table-striped">
- <tr><th>Parameters</th><td>@{dataSourceParams}</td></tr>
- </table>
- <h2>Data Preparator</h2>
- <table class="table table-bordered table-striped">
- <tr><th>Parameters</th><td>@{preparatorParams}</td></tr>
- </table>
- <h2>Algorithms and Models</h2>
- <table class="table table-bordered table-striped">
- <tr><th>#</th><th colspan="2">Information</th></tr>
- @for(a <- algorithms.zip(algorithmsParams).zip(models).zipWithIndex) {
- <tr>
- <th rowspan="3">@{a._2 + 1}</th>
- <th>Class</th><td>@{a._1._1._1}</td>
- </tr>
- <tr><th>Parameters</th><td>@{a._1._1._2}</td></tr>
- <tr><th>Model</th><td>@{a._1._2}</td></tr>
- }
- </table>
- <h2>Serving</h2>
- <table class="table table-bordered table-striped">
- <tr><th>Parameters</th><td>@{servingParams}</td></tr>
- </table>
- <h2>Feedback Loop Information</h2>
- <table class="table table-bordered table-striped">
- <tr><th>Feedback Loop Enabled?</th><td>@{feedback}</td></tr>
- <tr><th>Event Server IP</th><td>@{eventServerIp}</td></tr>
- <tr><th>Event Server Port</th><td>@{eventServerPort}</td></tr>
- </table>
- </div>
- </body>
-</html>
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/twirl/org/apache/predictionio/controller/metric_evaluator.scala.html
----------------------------------------------------------------------
diff --git a/core/src/main/twirl/org/apache/predictionio/controller/metric_evaluator.scala.html b/core/src/main/twirl/org/apache/predictionio/controller/metric_evaluator.scala.html
new file mode 100644
index 0000000..2e679a5
--- /dev/null
+++ b/core/src/main/twirl/org/apache/predictionio/controller/metric_evaluator.scala.html
@@ -0,0 +1,95 @@
+<html>
+ <head>
+ <script type='text/javascript' src='https://www.google.com/jsapi'></script>
+ <script src="http://ajax.googleapis.com/ajax/libs/jquery/2.1.1/jquery.min.js"></script>
+ <script>
+ google.load('visualization', '1', {packages:['table', 'corechart',]});
+ </script>
+ </head>
+ <body>
+ <h1>Metric Evaluator</h1>
+ <div id='debug'></div>
+ <div id='table'>
+ <h3>Engine Params Evaluation Results</h3>
+ <div>Click on table to view the engine params</div>
+ </div>
+ <pre id='engineParams'></div>
+ <script type='text/javascript'>
+ google.setOnLoadCallback(load);
+
+ //var url = 'http://localhost:9000/engine_instances/ky01Q-glQheNE_s885JTSg/local_evaluator_results.json';
+ var url = 'evaluator_results.json';
+ var rawData;
+ var metricHeader;
+ var otherMetricHeaders;
+ var engineParamsScores;
+ var table;
+ var dataTable;
+
+ function load() {
+ rawData = JSON.parse(
+ jQuery.ajax({
+ url: url,
+ dataType: 'json',
+ async: false,
+ }).responseText);
+
+ metricHeader = rawData['metricHeader'];
+ otherMetricHeaders = rawData['otherMetricHeaders'];
+ engineParamsScores = rawData['engineParamsScores'];
+
+ drawTable();
+ }
+
+ function tableSelectHandler() {
+ var selection = table.getSelection();
+ if (selection.length > 0) {
+ var row = selection[0].row;
+ var idx = dataTable.getValue(row, 0);
+ var engineParams = engineParamsScores[idx]._1;
+
+ document.getElementById('engineParams').innerHTML = JSON.stringify(
+ engineParams, null, 2);
+ } else {
+ document.getElementById('engineParams').innerHTML = "";
+ }
+ }
+
+ function drawTable() {
+ var tableDiv = document.createElement('div');
+ document.getElementById('table').appendChild(tableDiv);
+
+
+ var dataArray = [];
+
+ var headers = ['Index', 'Best', metricHeader].concat(otherMetricHeaders);
+ dataArray.push(headers);
+
+ for (epIdx = 0; epIdx < engineParamsScores.length; epIdx++) {
+ var epScore = engineParamsScores[epIdx];
+ var isBest = (epIdx == rawData.bestIdx ? "*" : "");
+ dataArray.push([epIdx, isBest, epScore._2.score].concat(epScore._2.otherScores));
+ }
+
+ dataTable = google.visualization.arrayToDataTable(dataArray, false);
+
+ // formatter
+ var numberFormatter = new google.visualization.NumberFormat({fractionDigits: 4});
+
+ for (colIdx = 1; colIdx < dataTable.getNumberOfColumns(); colIdx++) {
+ if (dataTable.getColumnType(colIdx) == "number") {
+ numberFormatter.format(dataTable, colIdx);
+ }
+ }
+
+ table = new google.visualization.Table(tableDiv);
+
+ // select handler
+ google.visualization.events.addListener(table, 'select', tableSelectHandler);
+
+ table.draw(dataTable);
+ }
+
+ </script>
+ </body>
+</html>
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/twirl/org/apache/predictionio/workflow/index.scala.html
----------------------------------------------------------------------
diff --git a/core/src/main/twirl/org/apache/predictionio/workflow/index.scala.html b/core/src/main/twirl/org/apache/predictionio/workflow/index.scala.html
new file mode 100644
index 0000000..4e0707b
--- /dev/null
+++ b/core/src/main/twirl/org/apache/predictionio/workflow/index.scala.html
@@ -0,0 +1,92 @@
+@import io.prediction.data.storage.EngineInstance
+@import io.prediction.data.storage.EngineManifest
+@import io.prediction.workflow.ServerConfig
+@import org.joda.time.DateTime
+@import org.joda.time.format.DateTimeFormat
+@(args: ServerConfig,
+ manifest: EngineManifest,
+ engineInstance: EngineInstance,
+ algorithms: Seq[String],
+ algorithmsParams: Seq[String],
+ models: Seq[String],
+ dataSourceParams: String,
+ preparatorParams: String,
+ servingParams: String,
+ serverStartTime: DateTime,
+ feedback: Boolean,
+ eventServerIp: String,
+ eventServerPort: Int,
+ requestCount: Int,
+ avgServingSec: Double,
+ lastServingSec: Double
+ )
+<!DOCTYPE html>
+<html lang="en">
+ <head>
+ <title>@{engineInstance.engineFactory} (@{engineInstance.engineVariant}) - PredictionIO Engine Server at @{args.ip}:@{args.port}</title>
+ <link href="/assets/bootstrap-3.2.0-dist/css/bootstrap.min.css" rel="stylesheet">
+ <style type="text/css">
+ td { font-family: Menlo, Monaco, Consolas, "Courier New", monospace; }
+ </style>
+ </head>
+ <body>
+ <div class="container">
+ <div class="page-header">
+ <h1>PredictionIO Engine Server at @{args.ip}:@{args.port}</h1>
+ <p class="lead">@{engineInstance.engineFactory} (@{engineInstance.engineVariant})</p>
+ </div>
+ <h2>Engine Information</h2>
+ <table class="table table-bordered table-striped">
+ <tr><th>Training Start Time</th><td>@{DateTimeFormat.forStyle("FF").print(engineInstance.startTime)}</td></tr>
+ <tr><th>Training End Time</th><td>@{DateTimeFormat.forStyle("FF").print(engineInstance.endTime)}</td></tr>
+ <tr><th>Variant ID</th><td>@{engineInstance.engineVariant}</td></tr>
+ <tr><th>Instance ID</th><td>@{engineInstance.id}</td></tr>
+ </table>
+ <h2>Server Information</h2>
+ <table class="table table-bordered table-striped">
+ <tr><th>Start Time</th><td>@{DateTimeFormat.forStyle("FF").print(serverStartTime)}</td></tr>
+ <tr><th>Request Count</th><td>@{requestCount}</td></tr>
+ <tr><th>Average Serving Time</th><td>@{f"${avgServingSec}%.4f"} seconds</td></tr>
+ <tr><th>Last Serving Time</th><td>@{f"${lastServingSec}%.4f"} seconds</td></tr>
+ <tr><th>Engine Factory Class (Scala/Java)</th><td>@{engineInstance.engineFactory}</td></tr>
+ <tr>
+ <th rowspan="@(manifest.files.size)">Library Files</th>
+ <td>@{manifest.files.head}</td>
+ </tr>
+ @for(f <- manifest.files.drop(1)) {
+ <tr><td>@f</td></tr>
+ }
+ </table>
+ <h2>Data Source</h2>
+ <table class="table table-bordered table-striped">
+ <tr><th>Parameters</th><td>@{dataSourceParams}</td></tr>
+ </table>
+ <h2>Data Preparator</h2>
+ <table class="table table-bordered table-striped">
+ <tr><th>Parameters</th><td>@{preparatorParams}</td></tr>
+ </table>
+ <h2>Algorithms and Models</h2>
+ <table class="table table-bordered table-striped">
+ <tr><th>#</th><th colspan="2">Information</th></tr>
+ @for(a <- algorithms.zip(algorithmsParams).zip(models).zipWithIndex) {
+ <tr>
+ <th rowspan="3">@{a._2 + 1}</th>
+ <th>Class</th><td>@{a._1._1._1}</td>
+ </tr>
+ <tr><th>Parameters</th><td>@{a._1._1._2}</td></tr>
+ <tr><th>Model</th><td>@{a._1._2}</td></tr>
+ }
+ </table>
+ <h2>Serving</h2>
+ <table class="table table-bordered table-striped">
+ <tr><th>Parameters</th><td>@{servingParams}</td></tr>
+ </table>
+ <h2>Feedback Loop Information</h2>
+ <table class="table table-bordered table-striped">
+ <tr><th>Feedback Loop Enabled?</th><td>@{feedback}</td></tr>
+ <tr><th>Event Server IP</th><td>@{eventServerIp}</td></tr>
+ <tr><th>Event Server Port</th><td>@{eventServerPort}</td></tr>
+ </table>
+ </div>
+ </body>
+</html>
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/java/io/prediction/workflow/JavaParams.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/io/prediction/workflow/JavaParams.java b/core/src/test/java/io/prediction/workflow/JavaParams.java
deleted file mode 100644
index 65108b5..0000000
--- a/core/src/test/java/io/prediction/workflow/JavaParams.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.workflow;
-
-import io.prediction.controller.Params;
-
-public class JavaParams implements Params {
- private final String p;
-
- public JavaParams(String p) {
- this.p = p;
- }
-
- public String getP() {
- return p;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/java/io/prediction/workflow/JavaQuery.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/io/prediction/workflow/JavaQuery.java b/core/src/test/java/io/prediction/workflow/JavaQuery.java
deleted file mode 100644
index 1630a2d..0000000
--- a/core/src/test/java/io/prediction/workflow/JavaQuery.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.workflow;
-
-import java.io.Serializable;
-
-public class JavaQuery implements Serializable{
- private final String q;
-
- public JavaQuery(String q) {
- this.q = q;
- }
-
- public String getQ() {
- return q;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- JavaQuery javaQuery = (JavaQuery) o;
-
- return !(q != null ? !q.equals(javaQuery.q) : javaQuery.q != null);
-
- }
-
- @Override
- public int hashCode() {
- return q != null ? q.hashCode() : 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/java/io/prediction/workflow/JavaQueryTypeAdapterFactory.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/io/prediction/workflow/JavaQueryTypeAdapterFactory.java b/core/src/test/java/io/prediction/workflow/JavaQueryTypeAdapterFactory.java
deleted file mode 100644
index 409859d..0000000
--- a/core/src/test/java/io/prediction/workflow/JavaQueryTypeAdapterFactory.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.workflow;
-
-import com.google.gson.Gson;
-import com.google.gson.TypeAdapter;
-import com.google.gson.TypeAdapterFactory;
-import com.google.gson.reflect.TypeToken;
-import com.google.gson.stream.JsonReader;
-import com.google.gson.stream.JsonToken;
-import com.google.gson.stream.JsonWriter;
-
-import java.io.IOException;
-
-public class JavaQueryTypeAdapterFactory implements TypeAdapterFactory {
- @Override
- public <T> TypeAdapter<T> create(Gson gson, TypeToken<T> type) {
- if (type.getRawType().equals(JavaQuery.class)) {
- return (TypeAdapter<T>) new TypeAdapter<JavaQuery>() {
- public void write(JsonWriter out, JavaQuery value) throws IOException {
- if (value == null) {
- out.nullValue();
- } else {
- out.beginObject();
- out.name("q").value(value.getQ().toUpperCase());
- out.endObject();
- }
- }
-
- public JavaQuery read(JsonReader reader) throws IOException {
- if (reader.peek() == JsonToken.NULL) {
- reader.nextNull();
- return null;
- } else {
- reader.beginObject();
- reader.nextName();
- String q = reader.nextString();
- reader.endObject();
- return new JavaQuery(q.toUpperCase());
- }
- }
- };
- } else {
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/java/org/apache/predictionio/workflow/JavaParams.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/predictionio/workflow/JavaParams.java b/core/src/test/java/org/apache/predictionio/workflow/JavaParams.java
new file mode 100644
index 0000000..982ecbf
--- /dev/null
+++ b/core/src/test/java/org/apache/predictionio/workflow/JavaParams.java
@@ -0,0 +1,30 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.workflow;
+
+import org.apache.predictionio.controller.Params;
+
+public class JavaParams implements Params {
+ private final String p;
+
+ public JavaParams(String p) {
+ this.p = p;
+ }
+
+ public String getP() {
+ return p;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/java/org/apache/predictionio/workflow/JavaQuery.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/predictionio/workflow/JavaQuery.java b/core/src/test/java/org/apache/predictionio/workflow/JavaQuery.java
new file mode 100644
index 0000000..f4a6359
--- /dev/null
+++ b/core/src/test/java/org/apache/predictionio/workflow/JavaQuery.java
@@ -0,0 +1,46 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.workflow;
+
+import java.io.Serializable;
+
+public class JavaQuery implements Serializable{
+ private final String q;
+
+ public JavaQuery(String q) {
+ this.q = q;
+ }
+
+ public String getQ() {
+ return q;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ JavaQuery javaQuery = (JavaQuery) o;
+
+ return !(q != null ? !q.equals(javaQuery.q) : javaQuery.q != null);
+
+ }
+
+ @Override
+ public int hashCode() {
+ return q != null ? q.hashCode() : 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/java/org/apache/predictionio/workflow/JavaQueryTypeAdapterFactory.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/predictionio/workflow/JavaQueryTypeAdapterFactory.java b/core/src/test/java/org/apache/predictionio/workflow/JavaQueryTypeAdapterFactory.java
new file mode 100644
index 0000000..46854d6
--- /dev/null
+++ b/core/src/test/java/org/apache/predictionio/workflow/JavaQueryTypeAdapterFactory.java
@@ -0,0 +1,60 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.workflow;
+
+import com.google.gson.Gson;
+import com.google.gson.TypeAdapter;
+import com.google.gson.TypeAdapterFactory;
+import com.google.gson.reflect.TypeToken;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonToken;
+import com.google.gson.stream.JsonWriter;
+
+import java.io.IOException;
+
+public class JavaQueryTypeAdapterFactory implements TypeAdapterFactory {
+ @Override
+ public <T> TypeAdapter<T> create(Gson gson, TypeToken<T> type) {
+ if (type.getRawType().equals(JavaQuery.class)) {
+ return (TypeAdapter<T>) new TypeAdapter<JavaQuery>() {
+ public void write(JsonWriter out, JavaQuery value) throws IOException {
+ if (value == null) {
+ out.nullValue();
+ } else {
+ out.beginObject();
+ out.name("q").value(value.getQ().toUpperCase());
+ out.endObject();
+ }
+ }
+
+ public JavaQuery read(JsonReader reader) throws IOException {
+ if (reader.peek() == JsonToken.NULL) {
+ reader.nextNull();
+ return null;
+ } else {
+ reader.beginObject();
+ reader.nextName();
+ String q = reader.nextString();
+ reader.endObject();
+ return new JavaQuery(q.toUpperCase());
+ }
+ }
+ };
+ } else {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/io/prediction/controller/EngineTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/prediction/controller/EngineTest.scala b/core/src/test/scala/io/prediction/controller/EngineTest.scala
deleted file mode 100644
index cc84249..0000000
--- a/core/src/test/scala/io/prediction/controller/EngineTest.scala
+++ /dev/null
@@ -1,615 +0,0 @@
-package io.prediction.controller
-
-import io.prediction.workflow.PersistentModelManifest
-import io.prediction.workflow.SharedSparkContext
-import io.prediction.workflow.StopAfterPrepareInterruption
-import io.prediction.workflow.StopAfterReadInterruption
-
-import grizzled.slf4j.Logger
-import io.prediction.workflow.WorkflowParams
-import org.apache.spark.rdd.RDD
-import org.scalatest.Inspectors._
-import org.scalatest.Matchers._
-import org.scalatest.FunSuite
-import org.scalatest.Inside
-
-import scala.util.Random
-
-class EngineSuite
-extends FunSuite with Inside with SharedSparkContext {
- import io.prediction.controller.Engine0._
- @transient lazy val logger = Logger[this.type]
-
- test("Engine.train") {
- val engine = new Engine(
- classOf[PDataSource2],
- classOf[PPreparator1],
- Map("" -> classOf[PAlgo2]),
- classOf[LServing1])
-
- val engineParams = EngineParams(
- dataSourceParams = PDataSource2.Params(0),
- preparatorParams = PPreparator1.Params(1),
- algorithmParamsList = Seq(("", PAlgo2.Params(2))),
- servingParams = LServing1.Params(3))
-
- val models = engine.train(
- sc,
- engineParams,
- engineInstanceId = "",
- params = WorkflowParams())
-
- val pd = ProcessedData(1, TrainingData(0))
-
- // PAlgo2.Model doesn't have IPersistentModel trait implemented. Hence the
- // model extract after train is Unit.
- models should contain theSameElementsAs Seq(Unit)
- }
-
- test("Engine.train persisting PAlgo.Model") {
- val engine = new Engine(
- classOf[PDataSource2],
- classOf[PPreparator1],
- Map(
- "PAlgo2" -> classOf[PAlgo2],
- "PAlgo3" -> classOf[PAlgo3]
- ),
- classOf[LServing1])
-
- val engineParams = EngineParams(
- dataSourceParams = PDataSource2.Params(0),
- preparatorParams = PPreparator1.Params(1),
- algorithmParamsList = Seq(
- ("PAlgo2", PAlgo2.Params(2)),
- ("PAlgo3", PAlgo3.Params(21)),
- ("PAlgo3", PAlgo3.Params(22))
- ),
- servingParams = LServing1.Params(3))
-
- val pd = ProcessedData(1, TrainingData(0))
- val model21 = PAlgo3.Model(21, pd)
- val model22 = PAlgo3.Model(22, pd)
-
- val models = engine.train(
- sc,
- engineParams,
- engineInstanceId = "",
- params = WorkflowParams())
-
- val pModel21 = PersistentModelManifest(model21.getClass.getName)
- val pModel22 = PersistentModelManifest(model22.getClass.getName)
-
- models should contain theSameElementsAs Seq(Unit, pModel21, pModel22)
- }
-
- test("Engine.train persisting LAlgo.Model") {
- val engine = Engine(
- classOf[LDataSource1],
- classOf[LPreparator1],
- Map(
- "LAlgo1" -> classOf[LAlgo1],
- "LAlgo2" -> classOf[LAlgo2],
- "LAlgo3" -> classOf[LAlgo3]
- ),
- classOf[LServing1])
-
- val engineParams = EngineParams(
- dataSourceParams = LDataSource1.Params(0),
- preparatorParams = LPreparator1.Params(1),
- algorithmParamsList = Seq(
- ("LAlgo2", LAlgo2.Params(20)),
- ("LAlgo2", LAlgo2.Params(21)),
- ("LAlgo3", LAlgo3.Params(22))),
- servingParams = LServing1.Params(3))
-
- val pd = ProcessedData(1, TrainingData(0))
- val model20 = LAlgo2.Model(20, pd)
- val model21 = LAlgo2.Model(21, pd)
- val model22 = LAlgo3.Model(22, pd)
-
- //val models = engine.train(sc, engineParams, WorkflowParams())
- val models = engine.train(
- sc,
- engineParams,
- engineInstanceId = "",
- params = WorkflowParams())
-
- val pModel20 = PersistentModelManifest(model20.getClass.getName)
- val pModel21 = PersistentModelManifest(model21.getClass.getName)
-
- models should contain theSameElementsAs Seq(pModel20, pModel21, model22)
- }
-
- test("Engine.train persisting P&NAlgo.Model") {
- val engine = new Engine(
- classOf[PDataSource2],
- classOf[PPreparator1],
- Map(
- "PAlgo2" -> classOf[PAlgo2],
- "PAlgo3" -> classOf[PAlgo3],
- "NAlgo2" -> classOf[NAlgo2],
- "NAlgo3" -> classOf[NAlgo3]
- ),
- classOf[LServing1])
-
- val engineParams = EngineParams(
- dataSourceParams = PDataSource2.Params(0),
- preparatorParams = PPreparator1.Params(1),
- algorithmParamsList = Seq(
- ("PAlgo2", PAlgo2.Params(20)),
- ("PAlgo3", PAlgo3.Params(21)),
- ("PAlgo3", PAlgo3.Params(22)),
- ("NAlgo2", NAlgo2.Params(23)),
- ("NAlgo3", NAlgo3.Params(24)),
- ("NAlgo3", NAlgo3.Params(25))
- ),
- servingParams = LServing1.Params(3))
-
- val pd = ProcessedData(1, TrainingData(0))
- val model21 = PAlgo3.Model(21, pd)
- val model22 = PAlgo3.Model(22, pd)
- val model23 = NAlgo2.Model(23, pd)
- val model24 = NAlgo3.Model(24, pd)
- val model25 = NAlgo3.Model(25, pd)
-
- //val models = engine.train(sc, engineParams, WorkflowParams())
- val models = engine.train(
- sc,
- engineParams,
- engineInstanceId = "",
- params = WorkflowParams())
-
- val pModel21 = PersistentModelManifest(model21.getClass.getName)
- val pModel22 = PersistentModelManifest(model22.getClass.getName)
- val pModel23 = PersistentModelManifest(model23.getClass.getName)
-
- models should contain theSameElementsAs Seq(
- Unit, pModel21, pModel22, pModel23, model24, model25)
- }
-
- test("Engine.eval") {
- val engine = new Engine(
- classOf[PDataSource2],
- classOf[PPreparator1],
- Map("" -> classOf[PAlgo2]),
- classOf[LServing1])
-
- val qn = 10
- val en = 3
-
- val engineParams = EngineParams(
- dataSourceParams = PDataSource2.Params(id = 0, en = en, qn = qn),
- preparatorParams = PPreparator1.Params(1),
- algorithmParamsList = Seq(("", PAlgo2.Params(2))),
- servingParams = LServing1.Params(3))
-
- val algoCount = engineParams.algorithmParamsList.size
- val pd = ProcessedData(1, TrainingData(0))
- val model0 = PAlgo2.Model(2, pd)
-
- val evalDataSet = engine.eval(sc, engineParams, WorkflowParams())
-
- evalDataSet should have size en
-
- forAll(evalDataSet.zipWithIndex) { case (evalData, ex) => {
- val (evalInfo, qpaRDD) = evalData
- evalInfo shouldBe EvalInfo(0)
-
- val qpaSeq: Seq[(Query, Prediction, Actual)] = qpaRDD.collect
-
- qpaSeq should have size qn
-
- forAll (qpaSeq) { case (q, p, a) =>
- val Query(qId, qEx, qQx, _) = q
- val Actual(aId, aEx, aQx) = a
- qId shouldBe aId
- qEx shouldBe ex
- aEx shouldBe ex
- qQx shouldBe aQx
-
- inside (p) { case Prediction(pId, pQ, pModels, pPs) => {
- pId shouldBe 3
- pQ shouldBe q
- pModels shouldBe None
- pPs should have size algoCount
- pPs shouldBe Seq(
- Prediction(id = 2, q = q, models = Some(model0)))
- }}
- }
- }}
- }
-
- test("Engine.prepareDeploy PAlgo") {
- val engine = new Engine(
- classOf[PDataSource2],
- classOf[PPreparator1],
- Map(
- "PAlgo2" -> classOf[PAlgo2],
- "PAlgo3" -> classOf[PAlgo3],
- "NAlgo2" -> classOf[NAlgo2],
- "NAlgo3" -> classOf[NAlgo3]
- ),
- classOf[LServing1])
-
- val engineParams = EngineParams(
- dataSourceParams = PDataSource2.Params(0),
- preparatorParams = PPreparator1.Params(1),
- algorithmParamsList = Seq(
- ("PAlgo2", PAlgo2.Params(20)),
- ("PAlgo3", PAlgo3.Params(21)),
- ("PAlgo3", PAlgo3.Params(22)),
- ("NAlgo2", NAlgo2.Params(23)),
- ("NAlgo3", NAlgo3.Params(24)),
- ("NAlgo3", NAlgo3.Params(25))
- ),
- servingParams = LServing1.Params(3))
-
- val pd = ProcessedData(1, TrainingData(0))
- val model20 = PAlgo2.Model(20, pd)
- val model21 = PAlgo3.Model(21, pd)
- val model22 = PAlgo3.Model(22, pd)
- val model23 = NAlgo2.Model(23, pd)
- val model24 = NAlgo3.Model(24, pd)
- val model25 = NAlgo3.Model(25, pd)
-
- val rand = new Random()
-
- val fakeEngineInstanceId = s"FakeInstanceId-${rand.nextLong()}"
-
- val persistedModels = engine.train(
- sc,
- engineParams,
- engineInstanceId = fakeEngineInstanceId,
- params = WorkflowParams()
- )
-
- val deployableModels = engine.prepareDeploy(
- sc,
- engineParams,
- fakeEngineInstanceId,
- persistedModels,
- params = WorkflowParams()
- )
-
- deployableModels should contain theSameElementsAs Seq(
- model20, model21, model22, model23, model24, model25)
- }
-}
-
-class EngineTrainSuite extends FunSuite with SharedSparkContext {
- import io.prediction.controller.Engine0._
- val defaultWorkflowParams: WorkflowParams = WorkflowParams()
-
- test("Parallel DS/P/Algos") {
- val models = Engine.train(
- sc,
- new PDataSource0(0),
- new PPreparator0(1),
- Seq(
- new PAlgo0(2),
- new PAlgo1(3),
- new PAlgo0(4)),
- defaultWorkflowParams
- )
-
- val pd = ProcessedData(1, TrainingData(0))
-
- models should contain theSameElementsAs Seq(
- PAlgo0.Model(2, pd), PAlgo1.Model(3, pd), PAlgo0.Model(4, pd))
- }
-
- test("Local DS/P/Algos") {
- val models = Engine.train(
- sc,
- new LDataSource0(0),
- new LPreparator0(1),
- Seq(
- new LAlgo0(2),
- new LAlgo1(3),
- new LAlgo0(4)),
- defaultWorkflowParams
- )
-
- val pd = ProcessedData(1, TrainingData(0))
-
- val expectedResults = Seq(
- LAlgo0.Model(2, pd),
- LAlgo1.Model(3, pd),
- LAlgo0.Model(4, pd))
-
- forAll(models.zip(expectedResults)) { case (model, expected) =>
- model shouldBe a [RDD[_]]
- val localModel = model.asInstanceOf[RDD[_]].collect
- localModel should contain theSameElementsAs Seq(expected)
- }
- }
-
- test("P2L DS/P/Algos") {
- val models = Engine.train(
- sc,
- new PDataSource0(0),
- new PPreparator0(1),
- Seq(
- new NAlgo0(2),
- new NAlgo1(3),
- new NAlgo0(4)),
- defaultWorkflowParams
- )
-
- val pd = ProcessedData(1, TrainingData(0))
-
- models should contain theSameElementsAs Seq(
- NAlgo0.Model(2, pd), NAlgo1.Model(3, pd), NAlgo0.Model(4, pd))
- }
-
- test("Parallel DS/P/Algos Stop-After-Read") {
- val workflowParams = defaultWorkflowParams.copy(
- stopAfterRead = true)
-
- an [StopAfterReadInterruption] should be thrownBy Engine.train(
- sc,
- new PDataSource0(0),
- new PPreparator0(1),
- Seq(
- new PAlgo0(2),
- new PAlgo1(3),
- new PAlgo0(4)),
- workflowParams
- )
- }
-
- test("Parallel DS/P/Algos Stop-After-Prepare") {
- val workflowParams = defaultWorkflowParams.copy(
- stopAfterPrepare = true)
-
- an [StopAfterPrepareInterruption] should be thrownBy Engine.train(
- sc,
- new PDataSource0(0),
- new PPreparator0(1),
- Seq(
- new PAlgo0(2),
- new PAlgo1(3),
- new PAlgo0(4)),
- workflowParams
- )
- }
-
- test("Parallel DS/P/Algos Dirty TrainingData") {
- val workflowParams = defaultWorkflowParams.copy(
- skipSanityCheck = false)
-
- an [AssertionError] should be thrownBy Engine.train(
- sc,
- new PDataSource3(0, error = true),
- new PPreparator0(1),
- Seq(
- new PAlgo0(2),
- new PAlgo1(3),
- new PAlgo0(4)),
- workflowParams
- )
- }
-
- test("Parallel DS/P/Algos Dirty TrainingData But Skip Check") {
- val workflowParams = defaultWorkflowParams.copy(
- skipSanityCheck = true)
-
- val models = Engine.train(
- sc,
- new PDataSource3(0, error = true),
- new PPreparator0(1),
- Seq(
- new PAlgo0(2),
- new PAlgo1(3),
- new PAlgo0(4)),
- workflowParams
- )
-
- val pd = ProcessedData(1, TrainingData(0, error = true))
-
- models should contain theSameElementsAs Seq(
- PAlgo0.Model(2, pd), PAlgo1.Model(3, pd), PAlgo0.Model(4, pd))
- }
-}
-
-
-class EngineEvalSuite
-extends FunSuite with Inside with SharedSparkContext {
- import io.prediction.controller.Engine0._
-
- @transient lazy val logger = Logger[this.type]
-
- test("Simple Parallel DS/P/A/S") {
- val en = 2
- val qn = 5
-
- val evalDataSet: Seq[(EvalInfo, RDD[(Query, Prediction, Actual)])] =
- Engine.eval(
- sc,
- new PDataSource1(id = 1, en = en, qn = qn),
- new PPreparator0(id = 2),
- Seq(new PAlgo0(id = 3)),
- new LServing0(id = 10))
-
- val pd = ProcessedData(2, TrainingData(1))
- val model0 = PAlgo0.Model(3, pd)
-
- forAll(evalDataSet.zipWithIndex) { case (evalData, ex) => {
- val (evalInfo, qpaRDD) = evalData
- evalInfo shouldBe EvalInfo(1)
-
- val qpaSeq: Seq[(Query, Prediction, Actual)] = qpaRDD.collect
- forAll (qpaSeq) { case (q, p, a) =>
- val Query(qId, qEx, qQx, _) = q
- val Actual(aId, aEx, aQx) = a
- qId shouldBe aId
- qEx shouldBe ex
- aEx shouldBe ex
- qQx shouldBe aQx
-
- inside (p) { case Prediction(pId, pQ, pModels, pPs) => {
- pId shouldBe 10
- pQ shouldBe q
- pModels shouldBe None
- pPs should have size 1
- pPs shouldBe Seq(
- Prediction(id = 3, q = q, models = Some(model0)))
- }}
- }
-
- }}
-
- }
-
- test("Parallel DS/P/A/S") {
- val en = 2
- val qn = 5
-
- val evalDataSet: Seq[(EvalInfo, RDD[(Query, Prediction, Actual)])] =
- Engine.eval(
- sc,
- new PDataSource1(id = 1, en = en, qn = qn),
- new PPreparator0(id = 2),
- Seq(
- new PAlgo0(id = 3),
- new PAlgo1(id = 4),
- new NAlgo1(id = 5)),
- new LServing0(id = 10))
-
- val pd = ProcessedData(2, TrainingData(1))
- val model0 = PAlgo0.Model(3, pd)
- val model1 = PAlgo1.Model(4, pd)
- val model2 = NAlgo1.Model(5, pd)
-
- forAll(evalDataSet.zipWithIndex) { case (evalData, ex) => {
- val (evalInfo, qpaRDD) = evalData
- evalInfo shouldBe EvalInfo(1)
-
- val qpaSeq: Seq[(Query, Prediction, Actual)] = qpaRDD.collect
- forAll (qpaSeq) { case (q, p, a) =>
- val Query(qId, qEx, qQx, _) = q
- val Actual(aId, aEx, aQx) = a
- qId shouldBe aId
- qEx shouldBe ex
- aEx shouldBe ex
- qQx shouldBe aQx
-
- inside (p) { case Prediction(pId, pQ, pModels, pPs) => {
- pId shouldBe 10
- pQ shouldBe q
- pModels shouldBe None
- pPs should have size 3
- pPs shouldBe Seq(
- Prediction(id = 3, q = q, models = Some(model0)),
- Prediction(id = 4, q = q, models = Some(model1)),
- Prediction(id = 5, q = q, models = Some(model2))
- )
- }}
- }
- }}
- }
-
- test("Parallel DS/P/A/S with Supplemented Query") {
- val en = 2
- val qn = 5
-
- val evalDataSet: Seq[(EvalInfo, RDD[(Query, Prediction, Actual)])] =
- Engine.eval(
- sc,
- new PDataSource1(id = 1, en = en, qn = qn),
- new PPreparator0(id = 2),
- Seq(
- new PAlgo0(id = 3),
- new PAlgo1(id = 4),
- new NAlgo1(id = 5)),
- new LServing2(id = 10))
-
- val pd = ProcessedData(2, TrainingData(1))
- val model0 = PAlgo0.Model(3, pd)
- val model1 = PAlgo1.Model(4, pd)
- val model2 = NAlgo1.Model(5, pd)
-
- forAll(evalDataSet.zipWithIndex) { case (evalData, ex) => {
- val (evalInfo, qpaRDD) = evalData
- evalInfo shouldBe EvalInfo(1)
-
- val qpaSeq: Seq[(Query, Prediction, Actual)] = qpaRDD.collect
- forAll (qpaSeq) { case (q, p, a) =>
- val Query(qId, qEx, qQx, qSupp) = q
- val Actual(aId, aEx, aQx) = a
- qId shouldBe aId
- qEx shouldBe ex
- aEx shouldBe ex
- qQx shouldBe aQx
- qSupp shouldBe false
-
- inside (p) { case Prediction(pId, pQ, pModels, pPs) => {
- pId shouldBe 10
- pQ shouldBe q
- pModels shouldBe None
- pPs should have size 3
- // queries inside prediction should have supp set to true, since it
- // represents what the algorithms see.
- val qSupp = q.copy(supp = true)
- pPs shouldBe Seq(
- Prediction(id = 3, q = qSupp, models = Some(model0)),
- Prediction(id = 4, q = qSupp, models = Some(model1)),
- Prediction(id = 5, q = qSupp, models = Some(model2))
- )
- }}
- }
- }}
- }
-
- test("Local DS/P/A/S") {
- val en = 2
- val qn = 5
-
- val evalDataSet: Seq[(EvalInfo, RDD[(Query, Prediction, Actual)])] =
- Engine.eval(
- sc,
- new LDataSource0(id = 1, en = en, qn = qn),
- new LPreparator0(id = 2),
- Seq(
- new LAlgo0(id = 3),
- new LAlgo1(id = 4),
- new LAlgo1(id = 5)),
- new LServing0(id = 10))
-
- val pd = ProcessedData(2, TrainingData(1))
- val model0 = LAlgo0.Model(3, pd)
- val model1 = LAlgo1.Model(4, pd)
- val model2 = LAlgo1.Model(5, pd)
-
- forAll(evalDataSet.zipWithIndex) { case (evalData, ex) => {
- val (evalInfo, qpaRDD) = evalData
- evalInfo shouldBe EvalInfo(1)
-
- val qpaSeq: Seq[(Query, Prediction, Actual)] = qpaRDD.collect
- forAll (qpaSeq) { case (q, p, a) =>
- val Query(qId, qEx, qQx, _) = q
- val Actual(aId, aEx, aQx) = a
- qId shouldBe aId
- qEx shouldBe ex
- aEx shouldBe ex
- qQx shouldBe aQx
-
- inside (p) { case Prediction(pId, pQ, pModels, pPs) => {
- pId shouldBe 10
- pQ shouldBe q
- pModels shouldBe None
- pPs should have size 3
- pPs shouldBe Seq(
- Prediction(id = 3, q = q, models = Some(model0)),
- Prediction(id = 4, q = q, models = Some(model1)),
- Prediction(id = 5, q = q, models = Some(model2))
- )
- }}
- }
-
- }}
-
- }
-}
-
-
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/io/prediction/controller/EvaluationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/prediction/controller/EvaluationTest.scala b/core/src/test/scala/io/prediction/controller/EvaluationTest.scala
deleted file mode 100644
index 5dc4c86..0000000
--- a/core/src/test/scala/io/prediction/controller/EvaluationTest.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-package io.prediction.controller
-
-import io.prediction.workflow.SharedSparkContext
-
-import org.scalatest.FunSuite
-import org.scalatest.Inside
-import org.scalatest.Matchers._
-
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
-
-object EvaluationSuite {
- import io.prediction.controller.TestEvaluator._
-
- class Metric0 extends Metric[EvalInfo, Query, Prediction, Actual, Int] {
- def calculate(
- sc: SparkContext,
- evalDataSet: Seq[(EvalInfo, RDD[(Query, Prediction, Actual)])]): Int = 1
- }
-
- object Evaluation0 extends Evaluation {
- engineMetric = (new FakeEngine(1, 1, 1), new Metric0())
- }
-}
-
-
-class EvaluationSuite
-extends FunSuite with Inside with SharedSparkContext {
- import io.prediction.controller.EvaluationSuite._
-
- test("Evaluation makes MetricEvaluator") {
- // MetricEvaluator is typed [EvalInfo, Query, Prediction, Actual, Int],
- // however this information is erased on JVM. scalatest doc recommends to
- // use wildcards.
- Evaluation0.evaluator shouldBe a [MetricEvaluator[_, _, _, _, _]]
- }
-
- test("Load from class path") {
- val r = io.prediction.workflow.WorkflowUtils.getEvaluation(
- "io.prediction.controller.EvaluationSuite.Evaluation0",
- getClass.getClassLoader)
-
- r._2 shouldBe EvaluationSuite.Evaluation0
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/io/prediction/controller/EvaluatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/prediction/controller/EvaluatorTest.scala b/core/src/test/scala/io/prediction/controller/EvaluatorTest.scala
deleted file mode 100644
index c57bd03..0000000
--- a/core/src/test/scala/io/prediction/controller/EvaluatorTest.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-package io.prediction.controller
-
-import io.prediction.core._
-import io.prediction.workflow.WorkflowParams
-
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
-
-object TestEvaluator {
- case class EvalInfo(id: Int, ex: Int)
- case class Query(id: Int, ex: Int, qx: Int)
- case class Prediction(id: Int, ex: Int, qx: Int)
- case class Actual(id: Int, ex: Int, qx: Int)
-
- class FakeEngine(val id: Int, val en: Int, val qn: Int)
- extends BaseEngine[EvalInfo, Query, Prediction, Actual] {
- def train(
- sc: SparkContext,
- engineParams: EngineParams,
- instanceId: String = "",
- params: WorkflowParams = WorkflowParams()
- ): Seq[Any] = {
- Seq[Any]()
- }
-
- def eval(
- sc: SparkContext,
- engineParams: EngineParams,
- params: WorkflowParams)
- : Seq[(EvalInfo, RDD[(Query, Prediction, Actual)])] = {
- (0 until en).map { ex => {
- val qpas = (0 until qn).map { qx => {
- (Query(id, ex, qx), Prediction(id, ex, qx), Actual(id, ex, qx))
- }}
-
- (EvalInfo(id = id, ex = ex), sc.parallelize(qpas))
- }}
- }
-
- }
-
- /*
- class Evaluator0 extends Evaluator[EvalInfo, Query, Prediction, Actual,
- (Query, Prediction, Actual),
- (EvalInfo, Seq[(Query, Prediction, Actual)]),
- Seq[(EvalInfo, (EvalInfo, Seq[(Query, Prediction, Actual)]))]
- ] {
-
- def evaluateUnit(q: Query, p: Prediction, a: Actual)
- : (Query, Prediction, Actual) = (q, p, a)
-
- def evaluateSet(
- evalInfo: EvalInfo,
- eus: Seq[(Query, Prediction, Actual)])
- : (EvalInfo, Seq[(Query, Prediction, Actual)]) = (evalInfo, eus)
-
- def evaluateAll(
- input: Seq[(EvalInfo, (EvalInfo, Seq[(Query, Prediction, Actual)]))])
- = input
- }
- */
-
-}
-
-/*
-class EvaluatorSuite
-extends FunSuite with Inside with SharedSparkContext {
- import io.prediction.controller.TestEvaluator._
- @transient lazy val logger = Logger[this.type]
-
- test("Evaluator.evaluate") {
- val engine = new FakeEngine(1, 3, 10)
- val evaluator = new Evaluator0()
-
- val evalDataSet = engine.eval(sc, null.asInstanceOf[EngineParams])
- val er: Seq[(EvalInfo, (EvalInfo, Seq[(Query, Prediction, Actual)]))] =
- evaluator.evaluateBase(sc, evalDataSet)
-
- evalDataSet.zip(er).map { case (input, output) => {
- val (inputEvalInfo, inputQpaRDD) = input
- val (outputEvalInfo, (outputEvalInfo2, outputQpaSeq)) = output
-
- inputEvalInfo shouldBe outputEvalInfo
- inputEvalInfo shouldBe outputEvalInfo2
-
- val inputQpaSeq: Array[(Query, Prediction, Actual)] = inputQpaRDD.collect
-
- inputQpaSeq.size should be (outputQpaSeq.size)
- // TODO. match inputQpa and outputQpa content.
- }}
- }
-}
-*/
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/io/prediction/controller/FastEvalEngineTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/prediction/controller/FastEvalEngineTest.scala b/core/src/test/scala/io/prediction/controller/FastEvalEngineTest.scala
deleted file mode 100644
index cdf4598..0000000
--- a/core/src/test/scala/io/prediction/controller/FastEvalEngineTest.scala
+++ /dev/null
@@ -1,181 +0,0 @@
-package io.prediction.controller
-
-import io.prediction.workflow.WorkflowParams
-import org.scalatest.FunSuite
-import org.scalatest.Inside
-import org.scalatest.Matchers._
-import org.scalatest.Inspectors._
-
-import io.prediction.workflow.SharedSparkContext
-
-class FastEngineSuite
-extends FunSuite with Inside with SharedSparkContext {
- import io.prediction.controller.Engine0._
-
- test("Single Evaluation") {
- val engine = new FastEvalEngine(
- Map("" -> classOf[PDataSource2]),
- Map("" -> classOf[PPreparator1]),
- Map(
- "PAlgo2" -> classOf[PAlgo2],
- "PAlgo3" -> classOf[PAlgo3]
- ),
- Map("" -> classOf[LServing1]))
-
- val qn = 10
- val en = 3
-
- val engineParams = EngineParams(
- dataSourceParams = PDataSource2.Params(id = 0, en = en, qn = qn),
- preparatorParams = PPreparator1.Params(1),
- algorithmParamsList = Seq(
- ("PAlgo2", PAlgo2.Params(20)),
- ("PAlgo2", PAlgo2.Params(21)),
- ("PAlgo3", PAlgo3.Params(22))
- ),
- servingParams = LServing1.Params(3))
-
- val algoCount = engineParams.algorithmParamsList.size
- val pd = ProcessedData(1, TrainingData(0))
- val model0 = PAlgo2.Model(20, pd)
- val model1 = PAlgo2.Model(21, pd)
- val model2 = PAlgo3.Model(22, pd)
-
- val evalDataSet = engine.eval(sc, engineParams, WorkflowParams())
-
- evalDataSet should have size en
-
- forAll(evalDataSet.zipWithIndex) { case (evalData, ex) => {
- val (evalInfo, qpaRDD) = evalData
- evalInfo shouldBe EvalInfo(0)
-
- val qpaSeq: Seq[(Query, Prediction, Actual)] = qpaRDD.collect
-
- qpaSeq should have size qn
-
- forAll (qpaSeq) { case (q, p, a) =>
- val Query(qId, qEx, qQx, _) = q
- val Actual(aId, aEx, aQx) = a
- qId shouldBe aId
- qEx shouldBe ex
- aEx shouldBe ex
- qQx shouldBe aQx
-
- inside (p) { case Prediction(pId, pQ, pModels, pPs) => {
- pId shouldBe 3
- pQ shouldBe q
- pModels shouldBe None
- pPs should have size algoCount
- pPs shouldBe Seq(
- Prediction(id = 20, q = q, models = Some(model0)),
- Prediction(id = 21, q = q, models = Some(model1)),
- Prediction(id = 22, q = q, models = Some(model2))
- )
- }}
- }
- }}
- }
-
- test("Batch Evaluation") {
- val engine = new FastEvalEngine(
- Map("" -> classOf[PDataSource2]),
- Map("" -> classOf[PPreparator1]),
- Map("" -> classOf[PAlgo2]),
- Map("" -> classOf[LServing1]))
-
- val qn = 10
- val en = 3
-
- val baseEngineParams = EngineParams(
- dataSourceParams = PDataSource2.Params(id = 0, en = en, qn = qn),
- preparatorParams = PPreparator1.Params(1),
- algorithmParamsList = Seq(("", PAlgo2.Params(2))),
- servingParams = LServing1.Params(3))
-
- val ep0 = baseEngineParams
- val ep1 = baseEngineParams.copy(
- algorithmParamsList = Seq(("", PAlgo2.Params(2))))
- val ep2 = baseEngineParams.copy(
- algorithmParamsList = Seq(("", PAlgo2.Params(20))))
-
- val engineEvalDataSet = engine.batchEval(
- sc,
- Seq(ep0, ep1, ep2),
- WorkflowParams())
-
- val evalDataSet0 = engineEvalDataSet(0)._2
- val evalDataSet1 = engineEvalDataSet(1)._2
- val evalDataSet2 = engineEvalDataSet(2)._2
-
- evalDataSet0 shouldBe evalDataSet1
- evalDataSet0 should not be evalDataSet2
- evalDataSet1 should not be evalDataSet2
-
- // evalDataSet0._1 should be theSameInstanceAs evalDataSet1._1
- // When things are cached correctly, evalDataSet0 and 1 should share the
- // same EI
- evalDataSet0.zip(evalDataSet1).foreach { case (e0, e1) => {
- e0._1 should be theSameInstanceAs e1._1
- e0._2 should be theSameInstanceAs e1._2
- }}
-
- // So as set1 and set2, however, the QPA-RDD should be different.
- evalDataSet1.zip(evalDataSet2).foreach { case (e1, e2) => {
- e1._1 should be theSameInstanceAs e2._1
- val e1Qpa = e1._2
- val e2Qpa = e2._2
- e1Qpa should not be theSameInstanceAs (e2Qpa)
- }}
- }
-
- test("Not cached when isEqual not implemented") {
- // PDataSource3.Params is a class not case class. Need to implement the
- // isEqual function for hashing.
- val engine = new FastEvalEngine(
- Map("" -> classOf[PDataSource4]),
- Map("" -> classOf[PPreparator1]),
- Map("" -> classOf[PAlgo2]),
- Map("" -> classOf[LServing1]))
-
- val qn = 10
- val en = 3
-
- val baseEngineParams = EngineParams(
- dataSourceParams = new PDataSource4.Params(id = 0, en = en, qn = qn),
- preparatorParams = PPreparator1.Params(1),
- algorithmParamsList = Seq(("", PAlgo2.Params(2))),
- servingParams = LServing1.Params(3))
-
- val ep0 = baseEngineParams
- val ep1 = baseEngineParams.copy(
- algorithmParamsList = Seq(("", PAlgo2.Params(3))))
- // ep2.dataSource is different from ep0.
- val ep2 = baseEngineParams.copy(
- dataSourceParams = ("", new PDataSource4.Params(id = 0, en = en, qn = qn)),
- algorithmParamsList = Seq(("", PAlgo2.Params(3))))
-
- val engineEvalDataSet = engine.batchEval(
- sc,
- Seq(ep0, ep1, ep2),
- WorkflowParams())
-
- val evalDataSet0 = engineEvalDataSet(0)._2
- val evalDataSet1 = engineEvalDataSet(1)._2
- val evalDataSet2 = engineEvalDataSet(2)._2
-
- evalDataSet0 should not be evalDataSet1
- evalDataSet0 should not be evalDataSet2
- evalDataSet1 should not be evalDataSet2
-
- // Set0 should have same EI as Set1, since their dsp are the same instance.
- evalDataSet0.zip(evalDataSet1).foreach { case (e0, e1) => {
- e0._1 should be theSameInstanceAs (e1._1)
- }}
-
- // Set1 should have different EI as Set2, since Set2's dsp is another
- // instance
- evalDataSet1.zip(evalDataSet2).foreach { case (e1, e2) => {
- e1._1 should not be theSameInstanceAs (e2._1)
- }}
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/io/prediction/controller/MetricEvaluatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/prediction/controller/MetricEvaluatorTest.scala b/core/src/test/scala/io/prediction/controller/MetricEvaluatorTest.scala
deleted file mode 100644
index 71fcb88..0000000
--- a/core/src/test/scala/io/prediction/controller/MetricEvaluatorTest.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.controller
-
-import io.prediction.workflow.SharedSparkContext
-import io.prediction.workflow.WorkflowParams
-import org.scalatest.FunSuite
-
-object MetricEvaluatorSuite {
- case class Metric0() extends SumMetric[EmptyParams, Int, Int, Int, Int] {
- def calculate(q: Int, p: Int, a: Int): Int = q
- }
-
- object Evaluation0 extends Evaluation {}
-}
-
-class MetricEvaluatorDevSuite extends FunSuite with SharedSparkContext {
- import io.prediction.controller.MetricEvaluatorSuite._
-
- test("a") {
- val metricEvaluator = MetricEvaluator(
- Metric0(),
- Seq(Metric0(), Metric0())
- )
-
- val engineEvalDataSet = Seq(
- (EngineParams(), Seq(
- (EmptyParams(), sc.parallelize(Seq((1,0,0), (2,0,0)))))),
- (EngineParams(), Seq(
- (EmptyParams(), sc.parallelize(Seq((1,0,0), (2,0,0)))))))
-
- val r = metricEvaluator.evaluateBase(
- sc,
- Evaluation0,
- engineEvalDataSet,
- WorkflowParams())
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/io/prediction/controller/MetricTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/prediction/controller/MetricTest.scala b/core/src/test/scala/io/prediction/controller/MetricTest.scala
deleted file mode 100644
index b846548..0000000
--- a/core/src/test/scala/io/prediction/controller/MetricTest.scala
+++ /dev/null
@@ -1,143 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.controller
-
-import io.prediction.workflow.SharedSparkContext
-
-import grizzled.slf4j.Logger
-import org.scalatest.Matchers._
-import org.scalatest.FunSuite
-import org.scalatest.Inside
-
-object MetricDevSuite {
- class QIntSumMetric extends SumMetric[EmptyParams, Int, Int, Int, Int] {
- def calculate(q: Int, p: Int, a: Int): Int = q
- }
-
- class QDoubleSumMetric extends SumMetric[EmptyParams, Int, Int, Int, Double] {
- def calculate(q: Int, p: Int, a: Int): Double = q.toDouble
- }
-
- class QAverageMetric extends AverageMetric[EmptyParams, Int, Int, Int] {
- def calculate(q: Int, p: Int, a: Int): Double = q.toDouble
- }
-
- class QOptionAverageMetric extends OptionAverageMetric[EmptyParams, Int, Int, Int] {
- def calculate(q: Int, p: Int, a: Int): Option[Double] = {
- if (q < 0) { None } else { Some(q.toDouble) }
- }
- }
-
- class QStdevMetric extends StdevMetric[EmptyParams, Int, Int, Int] {
- def calculate(q: Int, p: Int, a: Int): Double = q.toDouble
- }
-
- class QOptionStdevMetric extends OptionStdevMetric[EmptyParams, Int, Int, Int] {
- def calculate(q: Int, p: Int, a: Int): Option[Double] = {
- if (q < 0) { None } else { Some(q.toDouble) }
- }
- }
-
-}
-
-class MetricDevSuite
-extends FunSuite with Inside with SharedSparkContext {
- @transient lazy val logger = Logger[this.type]
-
- test("Average Metric") {
- val qpaSeq0 = Seq((1, 0, 0), (2, 0, 0), (3, 0, 0))
- val qpaSeq1 = Seq((4, 0, 0), (5, 0, 0), (6, 0, 0))
-
- val evalDataSet = Seq(
- (EmptyParams(), sc.parallelize(qpaSeq0)),
- (EmptyParams(), sc.parallelize(qpaSeq1)))
-
- val m = new MetricDevSuite.QAverageMetric()
- val result = m.calculate(sc, evalDataSet)
-
- result shouldBe (21.0 / 6)
- }
-
- test("Option Average Metric") {
- val qpaSeq0 = Seq((1, 0, 0), (2, 0, 0), (3, 0, 0))
- val qpaSeq1 = Seq((-4, 0, 0), (-5, 0, 0), (6, 0, 0))
-
- val evalDataSet = Seq(
- (EmptyParams(), sc.parallelize(qpaSeq0)),
- (EmptyParams(), sc.parallelize(qpaSeq1)))
-
- val m = new MetricDevSuite.QOptionAverageMetric()
- val result = m.calculate(sc, evalDataSet)
-
- result shouldBe (12.0 / 4)
- }
-
- test("Stdev Metric") {
- val qpaSeq0 = Seq((1, 0, 0), (1, 0, 0), (1, 0, 0), (1, 0, 0))
- val qpaSeq1 = Seq((5, 0, 0), (5, 0, 0), (5, 0, 0), (5, 0, 0))
-
- val evalDataSet = Seq(
- (EmptyParams(), sc.parallelize(qpaSeq0)),
- (EmptyParams(), sc.parallelize(qpaSeq1)))
-
- val m = new MetricDevSuite.QStdevMetric()
- val result = m.calculate(sc, evalDataSet)
-
- result shouldBe 2.0
- }
-
- test("Option Stdev Metric") {
- val qpaSeq0 = Seq((1, 0, 0), (1, 0, 0), (1, 0, 0), (1, 0, 0))
- val qpaSeq1 = Seq((5, 0, 0), (5, 0, 0), (5, 0, 0), (5, 0, 0), (-5, 0, 0))
-
- val evalDataSet = Seq(
- (EmptyParams(), sc.parallelize(qpaSeq0)),
- (EmptyParams(), sc.parallelize(qpaSeq1)))
-
- val m = new MetricDevSuite.QOptionStdevMetric()
- val result = m.calculate(sc, evalDataSet)
-
- result shouldBe 2.0
- }
-
- test("Sum Metric [Int]") {
- val qpaSeq0 = Seq((1, 0, 0), (2, 0, 0), (3, 0, 0))
- val qpaSeq1 = Seq((4, 0, 0), (5, 0, 0), (6, 0, 0))
-
- val evalDataSet = Seq(
- (EmptyParams(), sc.parallelize(qpaSeq0)),
- (EmptyParams(), sc.parallelize(qpaSeq1)))
-
- val m = new MetricDevSuite.QIntSumMetric()
- val result = m.calculate(sc, evalDataSet)
-
- result shouldBe 21
- }
-
- test("Sum Metric [Double]") {
- val qpaSeq0 = Seq((1, 0, 0), (2, 0, 0), (3, 0, 0))
- val qpaSeq1 = Seq((4, 0, 0), (5, 0, 0), (6, 0, 0))
-
- val evalDataSet = Seq(
- (EmptyParams(), sc.parallelize(qpaSeq0)),
- (EmptyParams(), sc.parallelize(qpaSeq1)))
-
- val m = new MetricDevSuite.QDoubleSumMetric()
- val result = m.calculate(sc, evalDataSet)
-
- result shouldBe 21.0
- }
-}