You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2016/07/18 20:17:50 UTC

[19/34] incubator-predictionio git commit: rename all except examples

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/workflow/WorkflowUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/workflow/WorkflowUtils.scala b/core/src/main/scala/org/apache/predictionio/workflow/WorkflowUtils.scala
new file mode 100644
index 0000000..e26b754
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/workflow/WorkflowUtils.scala
@@ -0,0 +1,419 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.workflow
+
+import java.io.File
+import java.io.FileNotFoundException
+
+import org.apache.predictionio.controller.EmptyParams
+import org.apache.predictionio.controller.EngineFactory
+import org.apache.predictionio.controller.EngineParamsGenerator
+import org.apache.predictionio.controller.Evaluation
+import org.apache.predictionio.controller.Params
+import org.apache.predictionio.controller.PersistentModelLoader
+import org.apache.predictionio.controller.Utils
+import org.apache.predictionio.core.BuildInfo
+
+import com.google.gson.Gson
+import com.google.gson.JsonSyntaxException
+import grizzled.slf4j.Logging
+import org.apache.predictionio.workflow.JsonExtractorOption.JsonExtractorOption
+import org.apache.log4j.Level
+import org.apache.log4j.LogManager
+import org.apache.spark.SparkContext
+import org.apache.spark.api.java.JavaRDDLike
+import org.apache.spark.rdd.RDD
+import org.json4s.JsonAST.JValue
+import org.json4s.MappingException
+import org.json4s._
+import org.json4s.native.JsonMethods._
+
+import scala.io.Source
+import scala.language.existentials
+import scala.reflect.runtime.universe
+
+/** Collection of reusable workflow related utilities. */
+object WorkflowUtils extends Logging {
+  @transient private lazy val gson = new Gson
+
+  /** Obtains an Engine object in Scala, or instantiate an Engine in Java.
+    *
+    * @param engine Engine factory name.
+    * @param cl A Java ClassLoader to look for engine-related classes.
+    *
+    * @throws ClassNotFoundException
+    *         Thrown when engine factory class does not exist.
+    * @throws NoSuchMethodException
+    *         Thrown when engine factory's apply() method is not implemented.
+    */
+  def getEngine(engine: String, cl: ClassLoader): (EngineLanguage.Value, EngineFactory) = {
+    val runtimeMirror = universe.runtimeMirror(cl)
+    val engineModule = runtimeMirror.staticModule(engine)
+    val engineObject = runtimeMirror.reflectModule(engineModule)
+    try {
+      (
+        EngineLanguage.Scala,
+        engineObject.instance.asInstanceOf[EngineFactory]
+      )
+    } catch {
+      case e @ (_: NoSuchFieldException | _: ClassNotFoundException) => try {
+        (
+          EngineLanguage.Java,
+          Class.forName(engine).newInstance.asInstanceOf[EngineFactory]
+        )
+      }
+    }
+  }
+
+  def getEngineParamsGenerator(epg: String, cl: ClassLoader):
+    (EngineLanguage.Value, EngineParamsGenerator) = {
+    val runtimeMirror = universe.runtimeMirror(cl)
+    val epgModule = runtimeMirror.staticModule(epg)
+    val epgObject = runtimeMirror.reflectModule(epgModule)
+    try {
+      (
+        EngineLanguage.Scala,
+        epgObject.instance.asInstanceOf[EngineParamsGenerator]
+      )
+    } catch {
+      case e @ (_: NoSuchFieldException | _: ClassNotFoundException) => try {
+        (
+          EngineLanguage.Java,
+          Class.forName(epg).newInstance.asInstanceOf[EngineParamsGenerator]
+        )
+      }
+    }
+  }
+
+  def getEvaluation(evaluation: String, cl: ClassLoader): (EngineLanguage.Value, Evaluation) = {
+    val runtimeMirror = universe.runtimeMirror(cl)
+    val evaluationModule = runtimeMirror.staticModule(evaluation)
+    val evaluationObject = runtimeMirror.reflectModule(evaluationModule)
+    try {
+      (
+        EngineLanguage.Scala,
+        evaluationObject.instance.asInstanceOf[Evaluation]
+      )
+    } catch {
+      case e @ (_: NoSuchFieldException | _: ClassNotFoundException) => try {
+        (
+          EngineLanguage.Java,
+          Class.forName(evaluation).newInstance.asInstanceOf[Evaluation]
+        )
+      }
+    }
+  }
+
+  /** Converts a JSON document to an instance of Params.
+    *
+    * @param language Engine's programming language.
+    * @param json JSON document.
+    * @param clazz Class of the component that is going to receive the resulting
+    *              Params instance as a constructor argument.
+    * @param jsonExtractor JSON extractor option.
+    * @param formats JSON4S serializers for deserialization.
+    *
+    * @throws MappingException Thrown when JSON4S fails to perform conversion.
+    * @throws JsonSyntaxException Thrown when GSON fails to perform conversion.
+    */
+  def extractParams(
+      language: EngineLanguage.Value = EngineLanguage.Scala,
+      json: String,
+      clazz: Class[_],
+      jsonExtractor: JsonExtractorOption,
+      formats: Formats = Utils.json4sDefaultFormats): Params = {
+    implicit val f = formats
+    val pClass = clazz.getConstructors.head.getParameterTypes
+    if (pClass.size == 0) {
+      if (json != "") {
+        warn(s"Non-empty parameters supplied to ${clazz.getName}, but its " +
+          "constructor does not accept any arguments. Stubbing with empty " +
+          "parameters.")
+      }
+      EmptyParams()
+    } else {
+      val apClass = pClass.head
+      try {
+        JsonExtractor.extract(jsonExtractor, json, apClass, f).asInstanceOf[Params]
+      } catch {
+        case e@(_: MappingException | _: JsonSyntaxException) =>
+          error(
+            s"Unable to extract parameters for ${apClass.getName} from " +
+              s"JSON string: $json. Aborting workflow.",
+            e)
+          throw e
+      }
+    }
+  }
+
+  def getParamsFromJsonByFieldAndClass(
+      variantJson: JValue,
+      field: String,
+      classMap: Map[String, Class[_]],
+      engineLanguage: EngineLanguage.Value,
+      jsonExtractor: JsonExtractorOption): (String, Params) = {
+    variantJson findField {
+      case JField(f, _) => f == field
+      case _ => false
+    } map { jv =>
+      implicit lazy val formats = Utils.json4sDefaultFormats + new NameParamsSerializer
+      val np: NameParams = try {
+        jv._2.extract[NameParams]
+      } catch {
+        case e: Exception =>
+          error(s"Unable to extract $field name and params $jv")
+          throw e
+      }
+      val extractedParams = np.params.map { p =>
+        try {
+          if (!classMap.contains(np.name)) {
+            error(s"Unable to find $field class with name '${np.name}'" +
+              " defined in Engine.")
+            sys.exit(1)
+          }
+          WorkflowUtils.extractParams(
+            engineLanguage,
+            compact(render(p)),
+            classMap(np.name),
+            jsonExtractor,
+            formats)
+        } catch {
+          case e: Exception =>
+            error(s"Unable to extract $field params $p")
+            throw e
+        }
+      }.getOrElse(EmptyParams())
+
+      (np.name, extractedParams)
+    } getOrElse("", EmptyParams())
+  }
+
+  /** Grab environmental variables that starts with 'PIO_'. */
+  def pioEnvVars: Map[String, String] =
+    sys.env.filter(kv => kv._1.startsWith("PIO_"))
+
+  /** Converts Java (non-Scala) objects to a JSON4S JValue.
+    *
+    * @param params The Java object to be converted.
+    */
+  def javaObjectToJValue(params: AnyRef): JValue = parse(gson.toJson(params))
+
+  private[prediction] def checkUpgrade(
+      component: String = "core",
+      engine: String = ""): Unit = {
+    val runner = new Thread(new UpgradeCheckRunner(component, engine))
+    runner.start()
+  }
+
+  // Extract debug string by recursively traversing the data.
+  def debugString[D](data: D): String = {
+    val s: String = data match {
+      case rdd: RDD[_] => {
+        debugString(rdd.collect())
+      }
+      case javaRdd: JavaRDDLike[_, _] => {
+        debugString(javaRdd.collect())
+      }
+      case array: Array[_] => {
+        "[" + array.map(debugString).mkString(",") + "]"
+      }
+      case d: AnyRef => {
+        d.toString
+      }
+      case null => "null"
+    }
+    s
+  }
+
+  /** Detect third party software configuration files to be submitted as
+    * extras to Apache Spark. This makes sure all executors receive the same
+    * configuration.
+    */
+  def thirdPartyConfFiles: Seq[String] = {
+    val thirdPartyFiles = Map(
+      "PIO_CONF_DIR" -> "log4j.properties",
+      "ES_CONF_DIR" -> "elasticsearch.yml",
+      "HADOOP_CONF_DIR" -> "core-site.xml",
+      "HBASE_CONF_DIR" -> "hbase-site.xml")
+
+    thirdPartyFiles.keys.toSeq.map { k: String =>
+      sys.env.get(k) map { x =>
+        val p = Seq(x, thirdPartyFiles(k)).mkString(File.separator)
+        if (new File(p).exists) Seq(p) else Seq[String]()
+      } getOrElse Seq[String]()
+    }.flatten
+  }
+
+  def thirdPartyClasspaths: Seq[String] = {
+    val thirdPartyPaths = Seq(
+      "PIO_CONF_DIR",
+      "ES_CONF_DIR",
+      "POSTGRES_JDBC_DRIVER",
+      "MYSQL_JDBC_DRIVER",
+      "HADOOP_CONF_DIR",
+      "HBASE_CONF_DIR")
+    thirdPartyPaths.map(p =>
+      sys.env.get(p).map(Seq(_)).getOrElse(Seq[String]())
+    ).flatten
+  }
+
+  def modifyLogging(verbose: Boolean): Unit = {
+    val rootLoggerLevel = if (verbose) Level.TRACE else Level.INFO
+    val chattyLoggerLevel = if (verbose) Level.INFO else Level.WARN
+
+    LogManager.getRootLogger.setLevel(rootLoggerLevel)
+
+    LogManager.getLogger("org.elasticsearch").setLevel(chattyLoggerLevel)
+    LogManager.getLogger("org.apache.hadoop").setLevel(chattyLoggerLevel)
+    LogManager.getLogger("org.apache.spark").setLevel(chattyLoggerLevel)
+    LogManager.getLogger("org.eclipse.jetty").setLevel(chattyLoggerLevel)
+    LogManager.getLogger("akka").setLevel(chattyLoggerLevel)
+  }
+
+  def extractNameParams(jv: JValue): NameParams = {
+    implicit val formats = Utils.json4sDefaultFormats
+    val nameOpt = (jv \ "name").extract[Option[String]]
+    val paramsOpt = (jv \ "params").extract[Option[JValue]]
+
+    if (nameOpt.isEmpty && paramsOpt.isEmpty) {
+      error("Unable to find 'name' or 'params' fields in" +
+        s" ${compact(render(jv))}.\n" +
+        "Since 0.8.4, the 'params' field is required in engine.json" +
+        " in order to specify parameters for DataSource, Preparator or" +
+        " Serving.\n" +
+        "Please go to https://docs.prediction.io/resources/upgrade/" +
+        " for detailed instruction of how to change engine.json.")
+      sys.exit(1)
+    }
+
+    if (nameOpt.isEmpty) {
+      info(s"No 'name' is found. Default empty String will be used.")
+    }
+
+    if (paramsOpt.isEmpty) {
+      info(s"No 'params' is found. Default EmptyParams will be used.")
+    }
+
+    NameParams(
+      name = nameOpt.getOrElse(""),
+      params = paramsOpt
+    )
+  }
+
+  def extractSparkConf(root: JValue): List[(String, String)] = {
+    def flatten(jv: JValue): List[(List[String], String)] = {
+      jv match {
+        case JObject(fields) =>
+          for ((namePrefix, childJV) <- fields;
+               (name, value) <- flatten(childJV))
+          yield (namePrefix :: name) -> value
+        case JArray(_) => {
+          error("Arrays are not allowed in the sparkConf section of engine.js.")
+          sys.exit(1)
+        }
+        case JNothing => List()
+        case _ => List(List() -> jv.values.toString)
+      }
+    }
+
+    flatten(root \ "sparkConf").map(x =>
+      (x._1.reduce((a, b) => s"$a.$b"), x._2))
+  }
+}
+
+case class NameParams(name: String, params: Option[JValue])
+
+class NameParamsSerializer extends CustomSerializer[NameParams](format => ( {
+  case jv: JValue => WorkflowUtils.extractNameParams(jv)
+}, {
+  case x: NameParams =>
+    JObject(JField("name", JString(x.name)) ::
+      JField("params", x.params.getOrElse(JNothing)) :: Nil)
+}
+  ))
+
+/** Collection of reusable workflow related utilities that touch on Apache
+  * Spark. They are separated to avoid compilation problems with certain code.
+  */
+object SparkWorkflowUtils extends Logging {
+  def getPersistentModel[AP <: Params, M](
+      pmm: PersistentModelManifest,
+      runId: String,
+      params: AP,
+      sc: Option[SparkContext],
+      cl: ClassLoader): M = {
+    val runtimeMirror = universe.runtimeMirror(cl)
+    val pmmModule = runtimeMirror.staticModule(pmm.className)
+    val pmmObject = runtimeMirror.reflectModule(pmmModule)
+    try {
+      pmmObject.instance.asInstanceOf[PersistentModelLoader[AP, M]](
+        runId,
+        params,
+        sc)
+    } catch {
+      case e @ (_: NoSuchFieldException | _: ClassNotFoundException) => try {
+        val loadMethod = Class.forName(pmm.className).getMethod(
+          "load",
+          classOf[String],
+          classOf[Params],
+          classOf[SparkContext])
+        loadMethod.invoke(null, runId, params, sc.orNull).asInstanceOf[M]
+      } catch {
+        case e: ClassNotFoundException =>
+          error(s"Model class ${pmm.className} cannot be found.")
+          throw e
+        case e: NoSuchMethodException =>
+          error(
+            "The load(String, Params, SparkContext) method cannot be found.")
+          throw e
+      }
+    }
+  }
+}
+
+class UpgradeCheckRunner(
+    val component: String,
+    val engine: String) extends Runnable with Logging {
+  val version = BuildInfo.version
+  val versionsHost = "https://direct.prediction.io/"
+
+  def run(): Unit = {
+    val url = if (engine == "") {
+      s"$versionsHost$version/$component.json"
+    } else {
+      s"$versionsHost$version/$component/$engine.json"
+    }
+    try {
+      val upgradeData = Source.fromURL(url)
+    } catch {
+      case e: FileNotFoundException =>
+        debug(s"Update metainfo not found. $url")
+      case e: java.net.UnknownHostException =>
+        debug(s"${e.getClass.getName}: {e.getMessage}")
+    }
+    // TODO: Implement upgrade logic
+  }
+}
+
+class WorkflowInterruption() extends Exception
+
+case class StopAfterReadInterruption() extends WorkflowInterruption
+
+case class StopAfterPrepareInterruption() extends WorkflowInterruption
+
+object EngineLanguage extends Enumeration {
+  val Scala, Java = Value
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/twirl/io/prediction/controller/metric_evaluator.scala.html
----------------------------------------------------------------------
diff --git a/core/src/main/twirl/io/prediction/controller/metric_evaluator.scala.html b/core/src/main/twirl/io/prediction/controller/metric_evaluator.scala.html
deleted file mode 100644
index 2e679a5..0000000
--- a/core/src/main/twirl/io/prediction/controller/metric_evaluator.scala.html
+++ /dev/null
@@ -1,95 +0,0 @@
-<html>
-  <head>
-    <script type='text/javascript' src='https://www.google.com/jsapi'></script>
-    <script src="http://ajax.googleapis.com/ajax/libs/jquery/2.1.1/jquery.min.js"></script>
-    <script>
-      google.load('visualization', '1', {packages:['table', 'corechart',]});
-    </script>
-  </head>
-  <body>
-    <h1>Metric Evaluator</h1>
-    <div id='debug'></div>
-    <div id='table'>
-      <h3>Engine Params Evaluation Results</h3>
-      <div>Click on table to view the engine params</div>
-    </div>
-    <pre id='engineParams'></div>
-    <script type='text/javascript'>
-      google.setOnLoadCallback(load);
-
-      //var url =  'http://localhost:9000/engine_instances/ky01Q-glQheNE_s885JTSg/local_evaluator_results.json';
-      var url = 'evaluator_results.json';
-      var rawData;
-      var metricHeader;
-      var otherMetricHeaders;
-      var engineParamsScores;
-      var table;
-      var dataTable;
-
-      function load() {
-        rawData = JSON.parse(
-            jQuery.ajax({
-              url: url,
-              dataType: 'json',
-              async: false,
-              }).responseText);
-
-        metricHeader = rawData['metricHeader'];
-        otherMetricHeaders = rawData['otherMetricHeaders'];
-        engineParamsScores = rawData['engineParamsScores'];
-
-        drawTable();
-      }
-
-      function tableSelectHandler() {
-        var selection = table.getSelection();
-        if (selection.length > 0) {
-          var row = selection[0].row;
-          var idx = dataTable.getValue(row, 0);
-          var engineParams = engineParamsScores[idx]._1;
-
-          document.getElementById('engineParams').innerHTML = JSON.stringify(
-            engineParams, null, 2);
-        } else {
-          document.getElementById('engineParams').innerHTML = "";
-        }
-      }
-
-      function drawTable() {
-        var tableDiv = document.createElement('div');
-        document.getElementById('table').appendChild(tableDiv);
-
-
-        var dataArray = [];
-
-        var headers = ['Index', 'Best', metricHeader].concat(otherMetricHeaders);
-        dataArray.push(headers);
-
-        for (epIdx = 0; epIdx < engineParamsScores.length; epIdx++) {
-          var epScore = engineParamsScores[epIdx];
-          var isBest = (epIdx == rawData.bestIdx ? "*" : "");
-          dataArray.push([epIdx, isBest, epScore._2.score].concat(epScore._2.otherScores));
-        }
-
-        dataTable = google.visualization.arrayToDataTable(dataArray, false);
-
-        // formatter
-        var numberFormatter = new google.visualization.NumberFormat({fractionDigits: 4});
-
-        for (colIdx = 1; colIdx < dataTable.getNumberOfColumns(); colIdx++) {
-          if (dataTable.getColumnType(colIdx) == "number") {
-            numberFormatter.format(dataTable, colIdx);
-          }
-        }
-
-        table = new google.visualization.Table(tableDiv);
-
-        // select handler
-        google.visualization.events.addListener(table, 'select', tableSelectHandler);
-
-        table.draw(dataTable);
-      }
-
-    </script>
-  </body>
-</html>

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/twirl/io/prediction/workflow/index.scala.html
----------------------------------------------------------------------
diff --git a/core/src/main/twirl/io/prediction/workflow/index.scala.html b/core/src/main/twirl/io/prediction/workflow/index.scala.html
deleted file mode 100644
index 4e0707b..0000000
--- a/core/src/main/twirl/io/prediction/workflow/index.scala.html
+++ /dev/null
@@ -1,92 +0,0 @@
-@import io.prediction.data.storage.EngineInstance
-@import io.prediction.data.storage.EngineManifest
-@import io.prediction.workflow.ServerConfig
-@import org.joda.time.DateTime
-@import org.joda.time.format.DateTimeFormat
-@(args: ServerConfig,
-  manifest: EngineManifest,
-  engineInstance: EngineInstance,
-  algorithms: Seq[String],
-  algorithmsParams: Seq[String],
-  models: Seq[String],
-  dataSourceParams: String,
-  preparatorParams: String,
-  servingParams: String,
-  serverStartTime: DateTime,
-  feedback: Boolean,
-  eventServerIp: String,
-  eventServerPort: Int,
-  requestCount: Int,
-  avgServingSec: Double,
-  lastServingSec: Double
-  )
-<!DOCTYPE html>
-<html lang="en">
-  <head>
-    <title>@{engineInstance.engineFactory} (@{engineInstance.engineVariant}) - PredictionIO Engine Server at @{args.ip}:@{args.port}</title>
-    <link href="/assets/bootstrap-3.2.0-dist/css/bootstrap.min.css" rel="stylesheet">
-    <style type="text/css">
-      td { font-family: Menlo, Monaco, Consolas, "Courier New", monospace; }
-    </style>
-  </head>
-  <body>
-    <div class="container">
-      <div class="page-header">
-        <h1>PredictionIO Engine Server at @{args.ip}:@{args.port}</h1>
-        <p class="lead">@{engineInstance.engineFactory} (@{engineInstance.engineVariant})</p>
-      </div>
-      <h2>Engine Information</h2>
-      <table class="table table-bordered table-striped">
-        <tr><th>Training Start Time</th><td>@{DateTimeFormat.forStyle("FF").print(engineInstance.startTime)}</td></tr>
-        <tr><th>Training End Time</th><td>@{DateTimeFormat.forStyle("FF").print(engineInstance.endTime)}</td></tr>
-        <tr><th>Variant ID</th><td>@{engineInstance.engineVariant}</td></tr>
-        <tr><th>Instance ID</th><td>@{engineInstance.id}</td></tr>
-      </table>
-      <h2>Server Information</h2>
-      <table class="table table-bordered table-striped">
-        <tr><th>Start Time</th><td>@{DateTimeFormat.forStyle("FF").print(serverStartTime)}</td></tr>
-        <tr><th>Request Count</th><td>@{requestCount}</td></tr>
-        <tr><th>Average Serving Time</th><td>@{f"${avgServingSec}%.4f"} seconds</td></tr>
-        <tr><th>Last Serving Time</th><td>@{f"${lastServingSec}%.4f"} seconds</td></tr>
-        <tr><th>Engine Factory Class (Scala/Java)</th><td>@{engineInstance.engineFactory}</td></tr>
-        <tr>
-          <th rowspan="@(manifest.files.size)">Library Files</th>
-          <td>@{manifest.files.head}</td>
-        </tr>
-        @for(f <- manifest.files.drop(1)) {
-        <tr><td>@f</td></tr>
-        }
-      </table>
-      <h2>Data Source</h2>
-      <table class="table table-bordered table-striped">
-        <tr><th>Parameters</th><td>@{dataSourceParams}</td></tr>
-      </table>
-      <h2>Data Preparator</h2>
-      <table class="table table-bordered table-striped">
-        <tr><th>Parameters</th><td>@{preparatorParams}</td></tr>
-      </table>
-      <h2>Algorithms and Models</h2>
-        <table class="table table-bordered table-striped">
-          <tr><th>#</th><th colspan="2">Information</th></tr>
-          @for(a <- algorithms.zip(algorithmsParams).zip(models).zipWithIndex) {
-          <tr>
-            <th rowspan="3">@{a._2 + 1}</th>
-            <th>Class</th><td>@{a._1._1._1}</td>
-          </tr>
-          <tr><th>Parameters</th><td>@{a._1._1._2}</td></tr>
-          <tr><th>Model</th><td>@{a._1._2}</td></tr>
-          }
-        </table>
-      <h2>Serving</h2>
-      <table class="table table-bordered table-striped">
-        <tr><th>Parameters</th><td>@{servingParams}</td></tr>
-      </table>
-      <h2>Feedback Loop Information</h2>
-      <table class="table table-bordered table-striped">
-        <tr><th>Feedback Loop Enabled?</th><td>@{feedback}</td></tr>
-        <tr><th>Event Server IP</th><td>@{eventServerIp}</td></tr>
-        <tr><th>Event Server Port</th><td>@{eventServerPort}</td></tr>
-      </table>
-    </div>
-  </body>
-</html>

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/twirl/org/apache/predictionio/controller/metric_evaluator.scala.html
----------------------------------------------------------------------
diff --git a/core/src/main/twirl/org/apache/predictionio/controller/metric_evaluator.scala.html b/core/src/main/twirl/org/apache/predictionio/controller/metric_evaluator.scala.html
new file mode 100644
index 0000000..2e679a5
--- /dev/null
+++ b/core/src/main/twirl/org/apache/predictionio/controller/metric_evaluator.scala.html
@@ -0,0 +1,95 @@
+<html>
+  <head>
+    <script type='text/javascript' src='https://www.google.com/jsapi'></script>
+    <script src="http://ajax.googleapis.com/ajax/libs/jquery/2.1.1/jquery.min.js"></script>
+    <script>
+      google.load('visualization', '1', {packages:['table', 'corechart',]});
+    </script>
+  </head>
+  <body>
+    <h1>Metric Evaluator</h1>
+    <div id='debug'></div>
+    <div id='table'>
+      <h3>Engine Params Evaluation Results</h3>
+      <div>Click on table to view the engine params</div>
+    </div>
+    <pre id='engineParams'></div>
+    <script type='text/javascript'>
+      google.setOnLoadCallback(load);
+
+      //var url =  'http://localhost:9000/engine_instances/ky01Q-glQheNE_s885JTSg/local_evaluator_results.json';
+      var url = 'evaluator_results.json';
+      var rawData;
+      var metricHeader;
+      var otherMetricHeaders;
+      var engineParamsScores;
+      var table;
+      var dataTable;
+
+      function load() {
+        rawData = JSON.parse(
+            jQuery.ajax({
+              url: url,
+              dataType: 'json',
+              async: false,
+              }).responseText);
+
+        metricHeader = rawData['metricHeader'];
+        otherMetricHeaders = rawData['otherMetricHeaders'];
+        engineParamsScores = rawData['engineParamsScores'];
+
+        drawTable();
+      }
+
+      function tableSelectHandler() {
+        var selection = table.getSelection();
+        if (selection.length > 0) {
+          var row = selection[0].row;
+          var idx = dataTable.getValue(row, 0);
+          var engineParams = engineParamsScores[idx]._1;
+
+          document.getElementById('engineParams').innerHTML = JSON.stringify(
+            engineParams, null, 2);
+        } else {
+          document.getElementById('engineParams').innerHTML = "";
+        }
+      }
+
+      function drawTable() {
+        var tableDiv = document.createElement('div');
+        document.getElementById('table').appendChild(tableDiv);
+
+
+        var dataArray = [];
+
+        var headers = ['Index', 'Best', metricHeader].concat(otherMetricHeaders);
+        dataArray.push(headers);
+
+        for (epIdx = 0; epIdx < engineParamsScores.length; epIdx++) {
+          var epScore = engineParamsScores[epIdx];
+          var isBest = (epIdx == rawData.bestIdx ? "*" : "");
+          dataArray.push([epIdx, isBest, epScore._2.score].concat(epScore._2.otherScores));
+        }
+
+        dataTable = google.visualization.arrayToDataTable(dataArray, false);
+
+        // formatter
+        var numberFormatter = new google.visualization.NumberFormat({fractionDigits: 4});
+
+        for (colIdx = 1; colIdx < dataTable.getNumberOfColumns(); colIdx++) {
+          if (dataTable.getColumnType(colIdx) == "number") {
+            numberFormatter.format(dataTable, colIdx);
+          }
+        }
+
+        table = new google.visualization.Table(tableDiv);
+
+        // select handler
+        google.visualization.events.addListener(table, 'select', tableSelectHandler);
+
+        table.draw(dataTable);
+      }
+
+    </script>
+  </body>
+</html>

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/twirl/org/apache/predictionio/workflow/index.scala.html
----------------------------------------------------------------------
diff --git a/core/src/main/twirl/org/apache/predictionio/workflow/index.scala.html b/core/src/main/twirl/org/apache/predictionio/workflow/index.scala.html
new file mode 100644
index 0000000..4e0707b
--- /dev/null
+++ b/core/src/main/twirl/org/apache/predictionio/workflow/index.scala.html
@@ -0,0 +1,92 @@
+@import io.prediction.data.storage.EngineInstance
+@import io.prediction.data.storage.EngineManifest
+@import io.prediction.workflow.ServerConfig
+@import org.joda.time.DateTime
+@import org.joda.time.format.DateTimeFormat
+@(args: ServerConfig,
+  manifest: EngineManifest,
+  engineInstance: EngineInstance,
+  algorithms: Seq[String],
+  algorithmsParams: Seq[String],
+  models: Seq[String],
+  dataSourceParams: String,
+  preparatorParams: String,
+  servingParams: String,
+  serverStartTime: DateTime,
+  feedback: Boolean,
+  eventServerIp: String,
+  eventServerPort: Int,
+  requestCount: Int,
+  avgServingSec: Double,
+  lastServingSec: Double
+  )
+<!DOCTYPE html>
+<html lang="en">
+  <head>
+    <title>@{engineInstance.engineFactory} (@{engineInstance.engineVariant}) - PredictionIO Engine Server at @{args.ip}:@{args.port}</title>
+    <link href="/assets/bootstrap-3.2.0-dist/css/bootstrap.min.css" rel="stylesheet">
+    <style type="text/css">
+      td { font-family: Menlo, Monaco, Consolas, "Courier New", monospace; }
+    </style>
+  </head>
+  <body>
+    <div class="container">
+      <div class="page-header">
+        <h1>PredictionIO Engine Server at @{args.ip}:@{args.port}</h1>
+        <p class="lead">@{engineInstance.engineFactory} (@{engineInstance.engineVariant})</p>
+      </div>
+      <h2>Engine Information</h2>
+      <table class="table table-bordered table-striped">
+        <tr><th>Training Start Time</th><td>@{DateTimeFormat.forStyle("FF").print(engineInstance.startTime)}</td></tr>
+        <tr><th>Training End Time</th><td>@{DateTimeFormat.forStyle("FF").print(engineInstance.endTime)}</td></tr>
+        <tr><th>Variant ID</th><td>@{engineInstance.engineVariant}</td></tr>
+        <tr><th>Instance ID</th><td>@{engineInstance.id}</td></tr>
+      </table>
+      <h2>Server Information</h2>
+      <table class="table table-bordered table-striped">
+        <tr><th>Start Time</th><td>@{DateTimeFormat.forStyle("FF").print(serverStartTime)}</td></tr>
+        <tr><th>Request Count</th><td>@{requestCount}</td></tr>
+        <tr><th>Average Serving Time</th><td>@{f"${avgServingSec}%.4f"} seconds</td></tr>
+        <tr><th>Last Serving Time</th><td>@{f"${lastServingSec}%.4f"} seconds</td></tr>
+        <tr><th>Engine Factory Class (Scala/Java)</th><td>@{engineInstance.engineFactory}</td></tr>
+        <tr>
+          <th rowspan="@(manifest.files.size)">Library Files</th>
+          <td>@{manifest.files.head}</td>
+        </tr>
+        @for(f <- manifest.files.drop(1)) {
+        <tr><td>@f</td></tr>
+        }
+      </table>
+      <h2>Data Source</h2>
+      <table class="table table-bordered table-striped">
+        <tr><th>Parameters</th><td>@{dataSourceParams}</td></tr>
+      </table>
+      <h2>Data Preparator</h2>
+      <table class="table table-bordered table-striped">
+        <tr><th>Parameters</th><td>@{preparatorParams}</td></tr>
+      </table>
+      <h2>Algorithms and Models</h2>
+        <table class="table table-bordered table-striped">
+          <tr><th>#</th><th colspan="2">Information</th></tr>
+          @for(a <- algorithms.zip(algorithmsParams).zip(models).zipWithIndex) {
+          <tr>
+            <th rowspan="3">@{a._2 + 1}</th>
+            <th>Class</th><td>@{a._1._1._1}</td>
+          </tr>
+          <tr><th>Parameters</th><td>@{a._1._1._2}</td></tr>
+          <tr><th>Model</th><td>@{a._1._2}</td></tr>
+          }
+        </table>
+      <h2>Serving</h2>
+      <table class="table table-bordered table-striped">
+        <tr><th>Parameters</th><td>@{servingParams}</td></tr>
+      </table>
+      <h2>Feedback Loop Information</h2>
+      <table class="table table-bordered table-striped">
+        <tr><th>Feedback Loop Enabled?</th><td>@{feedback}</td></tr>
+        <tr><th>Event Server IP</th><td>@{eventServerIp}</td></tr>
+        <tr><th>Event Server Port</th><td>@{eventServerPort}</td></tr>
+      </table>
+    </div>
+  </body>
+</html>

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/java/io/prediction/workflow/JavaParams.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/io/prediction/workflow/JavaParams.java b/core/src/test/java/io/prediction/workflow/JavaParams.java
deleted file mode 100644
index 65108b5..0000000
--- a/core/src/test/java/io/prediction/workflow/JavaParams.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.workflow;
-
-import io.prediction.controller.Params;
-
-public class JavaParams implements Params {
-    private final String p;
-
-    public JavaParams(String p) {
-        this.p = p;
-    }
-
-    public String getP() {
-        return p;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/java/io/prediction/workflow/JavaQuery.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/io/prediction/workflow/JavaQuery.java b/core/src/test/java/io/prediction/workflow/JavaQuery.java
deleted file mode 100644
index 1630a2d..0000000
--- a/core/src/test/java/io/prediction/workflow/JavaQuery.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.workflow;
-
-import java.io.Serializable;
-
-public class JavaQuery implements Serializable{
-    private final String q;
-
-    public JavaQuery(String q) {
-        this.q = q;
-    }
-
-    public String getQ() {
-        return q;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-
-        JavaQuery javaQuery = (JavaQuery) o;
-
-        return !(q != null ? !q.equals(javaQuery.q) : javaQuery.q != null);
-
-    }
-
-    @Override
-    public int hashCode() {
-        return q != null ? q.hashCode() : 0;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/java/io/prediction/workflow/JavaQueryTypeAdapterFactory.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/io/prediction/workflow/JavaQueryTypeAdapterFactory.java b/core/src/test/java/io/prediction/workflow/JavaQueryTypeAdapterFactory.java
deleted file mode 100644
index 409859d..0000000
--- a/core/src/test/java/io/prediction/workflow/JavaQueryTypeAdapterFactory.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.workflow;
-
-import com.google.gson.Gson;
-import com.google.gson.TypeAdapter;
-import com.google.gson.TypeAdapterFactory;
-import com.google.gson.reflect.TypeToken;
-import com.google.gson.stream.JsonReader;
-import com.google.gson.stream.JsonToken;
-import com.google.gson.stream.JsonWriter;
-
-import java.io.IOException;
-
-public class JavaQueryTypeAdapterFactory implements TypeAdapterFactory {
-    @Override
-    public <T> TypeAdapter<T> create(Gson gson, TypeToken<T> type) {
-        if (type.getRawType().equals(JavaQuery.class)) {
-            return (TypeAdapter<T>) new TypeAdapter<JavaQuery>() {
-                public void write(JsonWriter out, JavaQuery value) throws IOException {
-                    if (value == null) {
-                        out.nullValue();
-                    } else {
-                        out.beginObject();
-                        out.name("q").value(value.getQ().toUpperCase());
-                        out.endObject();
-                    }
-                }
-
-                public JavaQuery read(JsonReader reader) throws IOException {
-                    if (reader.peek() == JsonToken.NULL) {
-                        reader.nextNull();
-                        return null;
-                    } else {
-                        reader.beginObject();
-                        reader.nextName();
-                        String q = reader.nextString();
-                        reader.endObject();
-                        return new JavaQuery(q.toUpperCase());
-                    }
-                }
-            };
-        } else {
-            return null;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/java/org/apache/predictionio/workflow/JavaParams.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/predictionio/workflow/JavaParams.java b/core/src/test/java/org/apache/predictionio/workflow/JavaParams.java
new file mode 100644
index 0000000..982ecbf
--- /dev/null
+++ b/core/src/test/java/org/apache/predictionio/workflow/JavaParams.java
@@ -0,0 +1,30 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.workflow;
+
+import org.apache.predictionio.controller.Params;
+
+public class JavaParams implements Params {
+    private final String p;
+
+    public JavaParams(String p) {
+        this.p = p;
+    }
+
+    public String getP() {
+        return p;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/java/org/apache/predictionio/workflow/JavaQuery.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/predictionio/workflow/JavaQuery.java b/core/src/test/java/org/apache/predictionio/workflow/JavaQuery.java
new file mode 100644
index 0000000..f4a6359
--- /dev/null
+++ b/core/src/test/java/org/apache/predictionio/workflow/JavaQuery.java
@@ -0,0 +1,46 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.workflow;
+
+import java.io.Serializable;
+
+public class JavaQuery implements Serializable{
+    private final String q;
+
+    public JavaQuery(String q) {
+        this.q = q;
+    }
+
+    public String getQ() {
+        return q;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        JavaQuery javaQuery = (JavaQuery) o;
+
+        return !(q != null ? !q.equals(javaQuery.q) : javaQuery.q != null);
+
+    }
+
+    @Override
+    public int hashCode() {
+        return q != null ? q.hashCode() : 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/java/org/apache/predictionio/workflow/JavaQueryTypeAdapterFactory.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/predictionio/workflow/JavaQueryTypeAdapterFactory.java b/core/src/test/java/org/apache/predictionio/workflow/JavaQueryTypeAdapterFactory.java
new file mode 100644
index 0000000..46854d6
--- /dev/null
+++ b/core/src/test/java/org/apache/predictionio/workflow/JavaQueryTypeAdapterFactory.java
@@ -0,0 +1,60 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.workflow;
+
+import com.google.gson.Gson;
+import com.google.gson.TypeAdapter;
+import com.google.gson.TypeAdapterFactory;
+import com.google.gson.reflect.TypeToken;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonToken;
+import com.google.gson.stream.JsonWriter;
+
+import java.io.IOException;
+
+public class JavaQueryTypeAdapterFactory implements TypeAdapterFactory {
+    @Override
+    public <T> TypeAdapter<T> create(Gson gson, TypeToken<T> type) {
+        if (type.getRawType().equals(JavaQuery.class)) {
+            return (TypeAdapter<T>) new TypeAdapter<JavaQuery>() {
+                public void write(JsonWriter out, JavaQuery value) throws IOException {
+                    if (value == null) {
+                        out.nullValue();
+                    } else {
+                        out.beginObject();
+                        out.name("q").value(value.getQ().toUpperCase());
+                        out.endObject();
+                    }
+                }
+
+                public JavaQuery read(JsonReader reader) throws IOException {
+                    if (reader.peek() == JsonToken.NULL) {
+                        reader.nextNull();
+                        return null;
+                    } else {
+                        reader.beginObject();
+                        reader.nextName();
+                        String q = reader.nextString();
+                        reader.endObject();
+                        return new JavaQuery(q.toUpperCase());
+                    }
+                }
+            };
+        } else {
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/io/prediction/controller/EngineTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/prediction/controller/EngineTest.scala b/core/src/test/scala/io/prediction/controller/EngineTest.scala
deleted file mode 100644
index cc84249..0000000
--- a/core/src/test/scala/io/prediction/controller/EngineTest.scala
+++ /dev/null
@@ -1,615 +0,0 @@
-package io.prediction.controller
-
-import io.prediction.workflow.PersistentModelManifest
-import io.prediction.workflow.SharedSparkContext
-import io.prediction.workflow.StopAfterPrepareInterruption
-import io.prediction.workflow.StopAfterReadInterruption
-
-import grizzled.slf4j.Logger
-import io.prediction.workflow.WorkflowParams
-import org.apache.spark.rdd.RDD
-import org.scalatest.Inspectors._
-import org.scalatest.Matchers._
-import org.scalatest.FunSuite
-import org.scalatest.Inside
-
-import scala.util.Random
-
-class EngineSuite
-extends FunSuite with Inside with SharedSparkContext {
-  import io.prediction.controller.Engine0._
-  @transient lazy val logger = Logger[this.type] 
-
-  test("Engine.train") {
-    val engine = new Engine(
-      classOf[PDataSource2],
-      classOf[PPreparator1],
-      Map("" -> classOf[PAlgo2]),
-      classOf[LServing1])
-
-    val engineParams = EngineParams(
-      dataSourceParams = PDataSource2.Params(0),
-      preparatorParams = PPreparator1.Params(1),
-      algorithmParamsList = Seq(("", PAlgo2.Params(2))),
-      servingParams = LServing1.Params(3))
-
-    val models = engine.train(
-      sc, 
-      engineParams, 
-      engineInstanceId = "",
-      params = WorkflowParams())
-    
-    val pd = ProcessedData(1, TrainingData(0))
-
-    // PAlgo2.Model doesn't have IPersistentModel trait implemented. Hence the
-    // model extract after train is Unit.
-    models should contain theSameElementsAs Seq(Unit)
-  }
-
-  test("Engine.train persisting PAlgo.Model") {
-    val engine = new Engine(
-      classOf[PDataSource2],
-      classOf[PPreparator1],
-      Map(
-        "PAlgo2" -> classOf[PAlgo2],
-        "PAlgo3" -> classOf[PAlgo3]
-      ),
-      classOf[LServing1])
-
-    val engineParams = EngineParams(
-      dataSourceParams = PDataSource2.Params(0),
-      preparatorParams = PPreparator1.Params(1),
-      algorithmParamsList = Seq(
-        ("PAlgo2", PAlgo2.Params(2)),
-        ("PAlgo3", PAlgo3.Params(21)),
-        ("PAlgo3", PAlgo3.Params(22))
-      ),
-      servingParams = LServing1.Params(3))
-
-    val pd = ProcessedData(1, TrainingData(0))
-    val model21 = PAlgo3.Model(21, pd)
-    val model22 = PAlgo3.Model(22, pd)
-
-    val models = engine.train(
-      sc, 
-      engineParams, 
-      engineInstanceId = "",
-      params = WorkflowParams())
-
-    val pModel21 = PersistentModelManifest(model21.getClass.getName)
-    val pModel22 = PersistentModelManifest(model22.getClass.getName)
-    
-    models should contain theSameElementsAs Seq(Unit, pModel21, pModel22)
-  }
-
-  test("Engine.train persisting LAlgo.Model") {
-    val engine = Engine(
-      classOf[LDataSource1],
-      classOf[LPreparator1],
-      Map(
-        "LAlgo1" -> classOf[LAlgo1],
-        "LAlgo2" -> classOf[LAlgo2],
-        "LAlgo3" -> classOf[LAlgo3]
-      ),
-      classOf[LServing1])
-
-    val engineParams = EngineParams(
-      dataSourceParams = LDataSource1.Params(0),
-      preparatorParams = LPreparator1.Params(1),
-      algorithmParamsList = Seq(
-        ("LAlgo2", LAlgo2.Params(20)),
-        ("LAlgo2", LAlgo2.Params(21)),
-        ("LAlgo3", LAlgo3.Params(22))),
-      servingParams = LServing1.Params(3))
-
-    val pd = ProcessedData(1, TrainingData(0))
-    val model20 = LAlgo2.Model(20, pd)
-    val model21 = LAlgo2.Model(21, pd)
-    val model22 = LAlgo3.Model(22, pd)
-
-    //val models = engine.train(sc, engineParams, WorkflowParams())
-    val models = engine.train(
-      sc, 
-      engineParams, 
-      engineInstanceId = "",
-      params = WorkflowParams())
-
-    val pModel20 = PersistentModelManifest(model20.getClass.getName)
-    val pModel21 = PersistentModelManifest(model21.getClass.getName)
-    
-    models should contain theSameElementsAs Seq(pModel20, pModel21, model22)
-  }
-  
-  test("Engine.train persisting P&NAlgo.Model") {
-    val engine = new Engine(
-      classOf[PDataSource2],
-      classOf[PPreparator1],
-      Map(
-        "PAlgo2" -> classOf[PAlgo2],
-        "PAlgo3" -> classOf[PAlgo3],
-        "NAlgo2" -> classOf[NAlgo2],
-        "NAlgo3" -> classOf[NAlgo3]
-      ),
-      classOf[LServing1])
-
-    val engineParams = EngineParams(
-      dataSourceParams = PDataSource2.Params(0),
-      preparatorParams = PPreparator1.Params(1),
-      algorithmParamsList = Seq(
-        ("PAlgo2", PAlgo2.Params(20)),
-        ("PAlgo3", PAlgo3.Params(21)),
-        ("PAlgo3", PAlgo3.Params(22)),
-        ("NAlgo2", NAlgo2.Params(23)),
-        ("NAlgo3", NAlgo3.Params(24)),
-        ("NAlgo3", NAlgo3.Params(25))
-      ),
-      servingParams = LServing1.Params(3))
-
-    val pd = ProcessedData(1, TrainingData(0))
-    val model21 = PAlgo3.Model(21, pd)
-    val model22 = PAlgo3.Model(22, pd)
-    val model23 = NAlgo2.Model(23, pd)
-    val model24 = NAlgo3.Model(24, pd)
-    val model25 = NAlgo3.Model(25, pd)
-
-    //val models = engine.train(sc, engineParams, WorkflowParams())
-    val models = engine.train(
-      sc, 
-      engineParams, 
-      engineInstanceId = "",
-      params = WorkflowParams())
-
-    val pModel21 = PersistentModelManifest(model21.getClass.getName)
-    val pModel22 = PersistentModelManifest(model22.getClass.getName)
-    val pModel23 = PersistentModelManifest(model23.getClass.getName)
-    
-    models should contain theSameElementsAs Seq(
-      Unit, pModel21, pModel22, pModel23, model24, model25)
-  }
-
-  test("Engine.eval") {
-    val engine = new Engine(
-      classOf[PDataSource2],
-      classOf[PPreparator1],
-      Map("" -> classOf[PAlgo2]),
-      classOf[LServing1])
-
-    val qn = 10
-    val en = 3
-
-    val engineParams = EngineParams(
-      dataSourceParams = PDataSource2.Params(id = 0, en = en, qn = qn),
-      preparatorParams = PPreparator1.Params(1),
-      algorithmParamsList = Seq(("", PAlgo2.Params(2))),
-      servingParams = LServing1.Params(3))
-
-    val algoCount = engineParams.algorithmParamsList.size
-    val pd = ProcessedData(1, TrainingData(0))
-    val model0 = PAlgo2.Model(2, pd)
-
-    val evalDataSet = engine.eval(sc, engineParams, WorkflowParams())
-
-    evalDataSet should have size en
-
-    forAll(evalDataSet.zipWithIndex) { case (evalData, ex) => {
-      val (evalInfo, qpaRDD) = evalData
-      evalInfo shouldBe EvalInfo(0)
-
-      val qpaSeq: Seq[(Query, Prediction, Actual)] = qpaRDD.collect
-
-      qpaSeq should have size qn
-
-      forAll (qpaSeq) { case (q, p, a) => 
-        val Query(qId, qEx, qQx, _) = q
-        val Actual(aId, aEx, aQx) = a
-        qId shouldBe aId
-        qEx shouldBe ex
-        aEx shouldBe ex
-        qQx shouldBe aQx
-
-        inside (p) { case Prediction(pId, pQ, pModels, pPs) => {
-          pId shouldBe 3
-          pQ shouldBe q
-          pModels shouldBe None
-          pPs should have size algoCount
-          pPs shouldBe Seq(
-            Prediction(id = 2, q = q, models = Some(model0)))
-        }}
-      }
-    }}
-  }
-
-  test("Engine.prepareDeploy PAlgo") {
-    val engine = new Engine(
-      classOf[PDataSource2],
-      classOf[PPreparator1],
-      Map(
-        "PAlgo2" -> classOf[PAlgo2],
-        "PAlgo3" -> classOf[PAlgo3],
-        "NAlgo2" -> classOf[NAlgo2],
-        "NAlgo3" -> classOf[NAlgo3]
-      ),
-      classOf[LServing1])
-
-    val engineParams = EngineParams(
-      dataSourceParams = PDataSource2.Params(0),
-      preparatorParams = PPreparator1.Params(1),
-      algorithmParamsList = Seq(
-        ("PAlgo2", PAlgo2.Params(20)),
-        ("PAlgo3", PAlgo3.Params(21)),
-        ("PAlgo3", PAlgo3.Params(22)),
-        ("NAlgo2", NAlgo2.Params(23)),
-        ("NAlgo3", NAlgo3.Params(24)),
-        ("NAlgo3", NAlgo3.Params(25))
-      ),
-      servingParams = LServing1.Params(3))
-
-    val pd = ProcessedData(1, TrainingData(0))
-    val model20 = PAlgo2.Model(20, pd)
-    val model21 = PAlgo3.Model(21, pd)
-    val model22 = PAlgo3.Model(22, pd)
-    val model23 = NAlgo2.Model(23, pd)
-    val model24 = NAlgo3.Model(24, pd)
-    val model25 = NAlgo3.Model(25, pd)
-
-    val rand = new Random()
-
-    val fakeEngineInstanceId = s"FakeInstanceId-${rand.nextLong()}"
-
-    val persistedModels = engine.train(
-      sc,
-      engineParams,
-      engineInstanceId = fakeEngineInstanceId,
-      params = WorkflowParams()
-    )
-
-    val deployableModels = engine.prepareDeploy(
-      sc,
-      engineParams,
-      fakeEngineInstanceId,
-      persistedModels,
-      params = WorkflowParams()
-    )
-
-    deployableModels should contain theSameElementsAs Seq(
-      model20, model21, model22, model23, model24, model25)
-  }
-}
-
-class EngineTrainSuite extends FunSuite with SharedSparkContext {
-  import io.prediction.controller.Engine0._
-  val defaultWorkflowParams: WorkflowParams = WorkflowParams()
-
-  test("Parallel DS/P/Algos") {
-    val models = Engine.train(
-      sc,
-      new PDataSource0(0),
-      new PPreparator0(1),
-      Seq(
-        new PAlgo0(2),
-        new PAlgo1(3),
-        new PAlgo0(4)),
-      defaultWorkflowParams
-    )
-
-    val pd = ProcessedData(1, TrainingData(0))
-
-    models should contain theSameElementsAs Seq(
-      PAlgo0.Model(2, pd), PAlgo1.Model(3, pd), PAlgo0.Model(4, pd))
-  }
-
-  test("Local DS/P/Algos") {
-    val models = Engine.train(
-      sc,
-      new LDataSource0(0),
-      new LPreparator0(1),
-      Seq(
-        new LAlgo0(2),
-        new LAlgo1(3),
-        new LAlgo0(4)),
-      defaultWorkflowParams
-    )
-    
-    val pd = ProcessedData(1, TrainingData(0))
-
-    val expectedResults = Seq(
-      LAlgo0.Model(2, pd),
-      LAlgo1.Model(3, pd),
-      LAlgo0.Model(4, pd))
-
-    forAll(models.zip(expectedResults)) { case (model, expected) => 
-      model shouldBe a [RDD[_]]
-      val localModel = model.asInstanceOf[RDD[_]].collect
-      localModel should contain theSameElementsAs Seq(expected)
-    }
-  }
-
-  test("P2L DS/P/Algos") {
-    val models = Engine.train(
-      sc,
-      new PDataSource0(0),
-      new PPreparator0(1),
-      Seq(
-        new NAlgo0(2),
-        new NAlgo1(3),
-        new NAlgo0(4)),
-      defaultWorkflowParams
-    )
-
-    val pd = ProcessedData(1, TrainingData(0))
-    
-    models should contain theSameElementsAs Seq(
-      NAlgo0.Model(2, pd), NAlgo1.Model(3, pd), NAlgo0.Model(4, pd))
-  }
-  
-  test("Parallel DS/P/Algos Stop-After-Read") {
-    val workflowParams = defaultWorkflowParams.copy(
-      stopAfterRead = true)
-
-    an [StopAfterReadInterruption] should be thrownBy Engine.train(
-      sc,
-      new PDataSource0(0),
-      new PPreparator0(1),
-      Seq(
-        new PAlgo0(2),
-        new PAlgo1(3),
-        new PAlgo0(4)),
-      workflowParams
-    )
-  }
-  
-  test("Parallel DS/P/Algos Stop-After-Prepare") {
-    val workflowParams = defaultWorkflowParams.copy(
-      stopAfterPrepare = true)
-
-    an [StopAfterPrepareInterruption] should be thrownBy Engine.train(
-      sc,
-      new PDataSource0(0),
-      new PPreparator0(1),
-      Seq(
-        new PAlgo0(2),
-        new PAlgo1(3),
-        new PAlgo0(4)),
-      workflowParams
-    )
-  }
-  
-  test("Parallel DS/P/Algos Dirty TrainingData") {
-    val workflowParams = defaultWorkflowParams.copy(
-      skipSanityCheck = false)
-
-    an [AssertionError] should be thrownBy Engine.train(
-      sc,
-      new PDataSource3(0, error = true),
-      new PPreparator0(1),
-      Seq(
-        new PAlgo0(2),
-        new PAlgo1(3),
-        new PAlgo0(4)),
-      workflowParams
-    )
-  }
-  
-  test("Parallel DS/P/Algos Dirty TrainingData But Skip Check") {
-    val workflowParams = defaultWorkflowParams.copy(
-      skipSanityCheck = true)
-
-    val models = Engine.train(
-      sc,
-      new PDataSource3(0, error = true),
-      new PPreparator0(1),
-      Seq(
-        new PAlgo0(2),
-        new PAlgo1(3),
-        new PAlgo0(4)),
-      workflowParams
-    )
-    
-  val pd = ProcessedData(1, TrainingData(0, error = true))
-
-    models should contain theSameElementsAs Seq(
-      PAlgo0.Model(2, pd), PAlgo1.Model(3, pd), PAlgo0.Model(4, pd))
-  }
-}
-
-
-class EngineEvalSuite
-extends FunSuite with Inside with SharedSparkContext {
-  import io.prediction.controller.Engine0._
-
-  @transient lazy val logger = Logger[this.type] 
-  
-  test("Simple Parallel DS/P/A/S") {
-    val en = 2
-    val qn = 5
-
-    val evalDataSet: Seq[(EvalInfo, RDD[(Query, Prediction, Actual)])] = 
-    Engine.eval(
-      sc,
-      new PDataSource1(id = 1, en = en, qn = qn),
-      new PPreparator0(id = 2),
-      Seq(new PAlgo0(id = 3)),
-      new LServing0(id = 10))
-
-    val pd = ProcessedData(2, TrainingData(1))
-    val model0 = PAlgo0.Model(3, pd)
-
-    forAll(evalDataSet.zipWithIndex) { case (evalData, ex) => {
-      val (evalInfo, qpaRDD) = evalData
-      evalInfo shouldBe EvalInfo(1)
-
-      val qpaSeq: Seq[(Query, Prediction, Actual)] = qpaRDD.collect
-      forAll (qpaSeq) { case (q, p, a) => 
-        val Query(qId, qEx, qQx, _) = q
-        val Actual(aId, aEx, aQx) = a
-        qId shouldBe aId
-        qEx shouldBe ex
-        aEx shouldBe ex
-        qQx shouldBe aQx
-
-        inside (p) { case Prediction(pId, pQ, pModels, pPs) => {
-          pId shouldBe 10
-          pQ shouldBe q
-          pModels shouldBe None
-          pPs should have size 1
-          pPs shouldBe Seq(
-            Prediction(id = 3, q = q, models = Some(model0)))
-        }}
-      }
-
-    }}
-
-  }
-
-  test("Parallel DS/P/A/S") {
-    val en = 2
-    val qn = 5
-
-    val evalDataSet: Seq[(EvalInfo, RDD[(Query, Prediction, Actual)])] = 
-    Engine.eval(
-      sc,
-      new PDataSource1(id = 1, en = en, qn = qn),
-      new PPreparator0(id = 2),
-      Seq(
-        new PAlgo0(id = 3), 
-        new PAlgo1(id = 4),
-        new NAlgo1(id = 5)),
-      new LServing0(id = 10))
-
-    val pd = ProcessedData(2, TrainingData(1))
-    val model0 = PAlgo0.Model(3, pd)
-    val model1 = PAlgo1.Model(4, pd)
-    val model2 = NAlgo1.Model(5, pd)
-
-    forAll(evalDataSet.zipWithIndex) { case (evalData, ex) => {
-      val (evalInfo, qpaRDD) = evalData
-      evalInfo shouldBe EvalInfo(1)
-
-      val qpaSeq: Seq[(Query, Prediction, Actual)] = qpaRDD.collect
-      forAll (qpaSeq) { case (q, p, a) => 
-        val Query(qId, qEx, qQx, _) = q
-        val Actual(aId, aEx, aQx) = a
-        qId shouldBe aId
-        qEx shouldBe ex
-        aEx shouldBe ex
-        qQx shouldBe aQx
-
-        inside (p) { case Prediction(pId, pQ, pModels, pPs) => {
-          pId shouldBe 10
-          pQ shouldBe q
-          pModels shouldBe None
-          pPs should have size 3
-          pPs shouldBe Seq(
-            Prediction(id = 3, q = q, models = Some(model0)),
-            Prediction(id = 4, q = q, models = Some(model1)),
-            Prediction(id = 5, q = q, models = Some(model2))
-          )
-        }}
-      }
-    }}
-  }
-  
-  test("Parallel DS/P/A/S with Supplemented Query") {
-    val en = 2
-    val qn = 5
-
-    val evalDataSet: Seq[(EvalInfo, RDD[(Query, Prediction, Actual)])] = 
-    Engine.eval(
-      sc,
-      new PDataSource1(id = 1, en = en, qn = qn),
-      new PPreparator0(id = 2),
-      Seq(
-        new PAlgo0(id = 3), 
-        new PAlgo1(id = 4),
-        new NAlgo1(id = 5)),
-      new LServing2(id = 10))
-
-    val pd = ProcessedData(2, TrainingData(1))
-    val model0 = PAlgo0.Model(3, pd)
-    val model1 = PAlgo1.Model(4, pd)
-    val model2 = NAlgo1.Model(5, pd)
-
-    forAll(evalDataSet.zipWithIndex) { case (evalData, ex) => {
-      val (evalInfo, qpaRDD) = evalData
-      evalInfo shouldBe EvalInfo(1)
-
-      val qpaSeq: Seq[(Query, Prediction, Actual)] = qpaRDD.collect
-      forAll (qpaSeq) { case (q, p, a) => 
-        val Query(qId, qEx, qQx, qSupp) = q
-        val Actual(aId, aEx, aQx) = a
-        qId shouldBe aId
-        qEx shouldBe ex
-        aEx shouldBe ex
-        qQx shouldBe aQx
-        qSupp shouldBe false
-
-        inside (p) { case Prediction(pId, pQ, pModels, pPs) => {
-          pId shouldBe 10
-          pQ shouldBe q
-          pModels shouldBe None
-          pPs should have size 3
-          // queries inside prediction should have supp set to true, since it
-          // represents what the algorithms see.
-          val qSupp = q.copy(supp = true)
-          pPs shouldBe Seq(
-            Prediction(id = 3, q = qSupp, models = Some(model0)),
-            Prediction(id = 4, q = qSupp, models = Some(model1)),
-            Prediction(id = 5, q = qSupp, models = Some(model2))
-          )
-        }}
-      }
-    }}
-  }
-  
-  test("Local DS/P/A/S") {
-    val en = 2
-    val qn = 5
-
-    val evalDataSet: Seq[(EvalInfo, RDD[(Query, Prediction, Actual)])] = 
-    Engine.eval(
-      sc,
-      new LDataSource0(id = 1, en = en, qn = qn),
-      new LPreparator0(id = 2),
-      Seq(
-        new LAlgo0(id = 3), 
-        new LAlgo1(id = 4),
-        new LAlgo1(id = 5)),
-      new LServing0(id = 10))
-
-    val pd = ProcessedData(2, TrainingData(1))
-    val model0 = LAlgo0.Model(3, pd)
-    val model1 = LAlgo1.Model(4, pd)
-    val model2 = LAlgo1.Model(5, pd)
-
-    forAll(evalDataSet.zipWithIndex) { case (evalData, ex) => {
-      val (evalInfo, qpaRDD) = evalData
-      evalInfo shouldBe EvalInfo(1)
-
-      val qpaSeq: Seq[(Query, Prediction, Actual)] = qpaRDD.collect
-      forAll (qpaSeq) { case (q, p, a) => 
-        val Query(qId, qEx, qQx, _) = q
-        val Actual(aId, aEx, aQx) = a
-        qId shouldBe aId
-        qEx shouldBe ex
-        aEx shouldBe ex
-        qQx shouldBe aQx
-
-        inside (p) { case Prediction(pId, pQ, pModels, pPs) => {
-          pId shouldBe 10
-          pQ shouldBe q
-          pModels shouldBe None
-          pPs should have size 3
-          pPs shouldBe Seq(
-            Prediction(id = 3, q = q, models = Some(model0)),
-            Prediction(id = 4, q = q, models = Some(model1)),
-            Prediction(id = 5, q = q, models = Some(model2))
-          )
-        }}
-      }
-
-    }}
-
-  }
-}
-
-

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/io/prediction/controller/EvaluationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/prediction/controller/EvaluationTest.scala b/core/src/test/scala/io/prediction/controller/EvaluationTest.scala
deleted file mode 100644
index 5dc4c86..0000000
--- a/core/src/test/scala/io/prediction/controller/EvaluationTest.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-package io.prediction.controller
-
-import io.prediction.workflow.SharedSparkContext
-
-import org.scalatest.FunSuite
-import org.scalatest.Inside
-import org.scalatest.Matchers._
-
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
-
-object EvaluationSuite {
-  import io.prediction.controller.TestEvaluator._
-
-  class Metric0 extends Metric[EvalInfo, Query, Prediction, Actual, Int] {
-    def calculate(
-      sc: SparkContext,
-      evalDataSet: Seq[(EvalInfo, RDD[(Query, Prediction, Actual)])]): Int = 1
-  }
-
-  object Evaluation0 extends Evaluation {
-    engineMetric = (new FakeEngine(1, 1, 1), new Metric0())
-  }
-}
-
-
-class EvaluationSuite
-extends FunSuite with Inside with SharedSparkContext {
-  import io.prediction.controller.EvaluationSuite._
-
-  test("Evaluation makes MetricEvaluator") {
-    // MetricEvaluator is typed [EvalInfo, Query, Prediction, Actual, Int],
-    // however this information is erased on JVM. scalatest doc recommends to
-    // use wildcards.
-    Evaluation0.evaluator shouldBe a [MetricEvaluator[_, _, _, _, _]]
-  }
-
-  test("Load from class path") {
-    val r = io.prediction.workflow.WorkflowUtils.getEvaluation(
-      "io.prediction.controller.EvaluationSuite.Evaluation0",
-      getClass.getClassLoader)
-
-    r._2 shouldBe EvaluationSuite.Evaluation0
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/io/prediction/controller/EvaluatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/prediction/controller/EvaluatorTest.scala b/core/src/test/scala/io/prediction/controller/EvaluatorTest.scala
deleted file mode 100644
index c57bd03..0000000
--- a/core/src/test/scala/io/prediction/controller/EvaluatorTest.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-package io.prediction.controller
-
-import io.prediction.core._
-import io.prediction.workflow.WorkflowParams
-
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
-
-object TestEvaluator {
-  case class EvalInfo(id: Int, ex: Int)
-  case class Query(id: Int, ex: Int, qx: Int)
-  case class Prediction(id: Int, ex: Int, qx: Int)
-  case class Actual(id: Int, ex: Int, qx: Int)
-
-  class FakeEngine(val id: Int, val en: Int, val qn: Int)
-  extends BaseEngine[EvalInfo, Query, Prediction, Actual] {
-    def train(
-      sc: SparkContext, 
-      engineParams: EngineParams,
-      instanceId: String = "",
-      params: WorkflowParams = WorkflowParams()
-    ): Seq[Any] = {
-      Seq[Any]()
-    }
-
-    def eval(
-      sc: SparkContext, 
-      engineParams: EngineParams, 
-      params: WorkflowParams)
-    : Seq[(EvalInfo, RDD[(Query, Prediction, Actual)])] = {
-      (0 until en).map { ex => {
-        val qpas = (0 until qn).map { qx => {
-          (Query(id, ex, qx), Prediction(id, ex, qx), Actual(id, ex, qx))
-        }}
-  
-        (EvalInfo(id = id, ex = ex), sc.parallelize(qpas))
-      }}
-    }
-  
-  }
-
-  /*
-  class Evaluator0 extends Evaluator[EvalInfo, Query, Prediction, Actual,
-      (Query, Prediction, Actual), 
-      (EvalInfo, Seq[(Query, Prediction, Actual)]),
-      Seq[(EvalInfo, (EvalInfo, Seq[(Query, Prediction, Actual)]))]
-      ] {
-
-    def evaluateUnit(q: Query, p: Prediction, a: Actual)
-    : (Query, Prediction, Actual) = (q, p, a)
-
-    def evaluateSet(
-        evalInfo: EvalInfo, 
-        eus: Seq[(Query, Prediction, Actual)])
-    : (EvalInfo, Seq[(Query, Prediction, Actual)]) = (evalInfo, eus)
-
-    def evaluateAll(
-      input: Seq[(EvalInfo, (EvalInfo, Seq[(Query, Prediction, Actual)]))]) 
-    = input
-  }
-  */
-
-}
-
-/*
-class EvaluatorSuite
-extends FunSuite with Inside with SharedSparkContext {
-  import io.prediction.controller.TestEvaluator._
-  @transient lazy val logger = Logger[this.type] 
-
-  test("Evaluator.evaluate") {
-    val engine = new FakeEngine(1, 3, 10)
-    val evaluator = new Evaluator0()
-  
-    val evalDataSet = engine.eval(sc, null.asInstanceOf[EngineParams])
-    val er: Seq[(EvalInfo, (EvalInfo, Seq[(Query, Prediction, Actual)]))] =
-      evaluator.evaluateBase(sc, evalDataSet)
-
-    evalDataSet.zip(er).map { case (input, output) => {
-      val (inputEvalInfo, inputQpaRDD) = input
-      val (outputEvalInfo, (outputEvalInfo2, outputQpaSeq)) = output
-      
-      inputEvalInfo shouldBe outputEvalInfo
-      inputEvalInfo shouldBe outputEvalInfo2
-      
-      val inputQpaSeq: Array[(Query, Prediction, Actual)] = inputQpaRDD.collect
-
-      inputQpaSeq.size should be (outputQpaSeq.size)
-      // TODO. match inputQpa and outputQpa content.
-    }}
-  }
-}
-*/

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/io/prediction/controller/FastEvalEngineTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/prediction/controller/FastEvalEngineTest.scala b/core/src/test/scala/io/prediction/controller/FastEvalEngineTest.scala
deleted file mode 100644
index cdf4598..0000000
--- a/core/src/test/scala/io/prediction/controller/FastEvalEngineTest.scala
+++ /dev/null
@@ -1,181 +0,0 @@
-package io.prediction.controller
-
-import io.prediction.workflow.WorkflowParams
-import org.scalatest.FunSuite
-import org.scalatest.Inside
-import org.scalatest.Matchers._
-import org.scalatest.Inspectors._
-
-import io.prediction.workflow.SharedSparkContext
-
-class FastEngineSuite
-extends FunSuite with Inside with SharedSparkContext {
-  import io.prediction.controller.Engine0._
-  
-  test("Single Evaluation") {
-    val engine = new FastEvalEngine(
-      Map("" -> classOf[PDataSource2]),
-      Map("" -> classOf[PPreparator1]),
-      Map(
-        "PAlgo2" -> classOf[PAlgo2],
-        "PAlgo3" -> classOf[PAlgo3]
-      ),
-      Map("" -> classOf[LServing1]))
-
-    val qn = 10
-    val en = 3
-
-    val engineParams = EngineParams(
-      dataSourceParams = PDataSource2.Params(id = 0, en = en, qn = qn),
-      preparatorParams = PPreparator1.Params(1),
-      algorithmParamsList = Seq(
-        ("PAlgo2", PAlgo2.Params(20)),
-        ("PAlgo2", PAlgo2.Params(21)),
-        ("PAlgo3", PAlgo3.Params(22))
-      ),
-      servingParams = LServing1.Params(3))
-
-    val algoCount = engineParams.algorithmParamsList.size
-    val pd = ProcessedData(1, TrainingData(0))
-    val model0 = PAlgo2.Model(20, pd)
-    val model1 = PAlgo2.Model(21, pd)
-    val model2 = PAlgo3.Model(22, pd)
-
-    val evalDataSet = engine.eval(sc, engineParams, WorkflowParams())
-
-    evalDataSet should have size en
-
-    forAll(evalDataSet.zipWithIndex) { case (evalData, ex) => {
-      val (evalInfo, qpaRDD) = evalData
-      evalInfo shouldBe EvalInfo(0)
-
-      val qpaSeq: Seq[(Query, Prediction, Actual)] = qpaRDD.collect
-
-      qpaSeq should have size qn
-
-      forAll (qpaSeq) { case (q, p, a) => 
-        val Query(qId, qEx, qQx, _) = q
-        val Actual(aId, aEx, aQx) = a
-        qId shouldBe aId
-        qEx shouldBe ex
-        aEx shouldBe ex
-        qQx shouldBe aQx
-
-        inside (p) { case Prediction(pId, pQ, pModels, pPs) => {
-          pId shouldBe 3
-          pQ shouldBe q
-          pModels shouldBe None
-          pPs should have size algoCount
-          pPs shouldBe Seq(
-            Prediction(id = 20, q = q, models = Some(model0)),
-            Prediction(id = 21, q = q, models = Some(model1)),
-            Prediction(id = 22, q = q, models = Some(model2))
-          )
-        }}
-      }
-    }}
-  }
-
-  test("Batch Evaluation") {
-    val engine = new FastEvalEngine(
-      Map("" -> classOf[PDataSource2]),
-      Map("" -> classOf[PPreparator1]),
-      Map("" -> classOf[PAlgo2]),
-      Map("" -> classOf[LServing1]))
-
-    val qn = 10
-    val en = 3
-
-    val baseEngineParams = EngineParams(
-      dataSourceParams = PDataSource2.Params(id = 0, en = en, qn = qn),
-      preparatorParams = PPreparator1.Params(1),
-      algorithmParamsList = Seq(("", PAlgo2.Params(2))),
-      servingParams = LServing1.Params(3))
-
-    val ep0 = baseEngineParams
-    val ep1 = baseEngineParams.copy(
-      algorithmParamsList = Seq(("", PAlgo2.Params(2))))
-    val ep2 = baseEngineParams.copy(
-      algorithmParamsList = Seq(("", PAlgo2.Params(20))))
-
-    val engineEvalDataSet = engine.batchEval(
-      sc,
-      Seq(ep0, ep1, ep2),
-      WorkflowParams())
-
-    val evalDataSet0 = engineEvalDataSet(0)._2
-    val evalDataSet1 = engineEvalDataSet(1)._2
-    val evalDataSet2 = engineEvalDataSet(2)._2
-
-    evalDataSet0 shouldBe evalDataSet1
-    evalDataSet0 should not be evalDataSet2
-    evalDataSet1 should not be evalDataSet2
-
-    // evalDataSet0._1 should be theSameInstanceAs evalDataSet1._1
-    // When things are cached correctly, evalDataSet0 and 1 should share the
-    // same EI
-    evalDataSet0.zip(evalDataSet1).foreach { case (e0, e1) => {
-      e0._1 should be theSameInstanceAs e1._1
-      e0._2 should be theSameInstanceAs e1._2
-    }}
-   
-    // So as set1 and set2, however, the QPA-RDD should be different.
-    evalDataSet1.zip(evalDataSet2).foreach { case (e1, e2) => {
-      e1._1 should be theSameInstanceAs e2._1
-      val e1Qpa = e1._2
-      val e2Qpa = e2._2
-      e1Qpa should not be theSameInstanceAs (e2Qpa)
-    }}
-  }
-  
-  test("Not cached when isEqual not implemented") {
-    // PDataSource3.Params is a class not case class. Need to implement the
-    // isEqual function for hashing.
-    val engine = new FastEvalEngine(
-      Map("" -> classOf[PDataSource4]),
-      Map("" -> classOf[PPreparator1]),
-      Map("" -> classOf[PAlgo2]),
-      Map("" -> classOf[LServing1]))
-
-    val qn = 10
-    val en = 3
-
-    val baseEngineParams = EngineParams(
-      dataSourceParams = new PDataSource4.Params(id = 0, en = en, qn = qn),
-      preparatorParams = PPreparator1.Params(1),
-      algorithmParamsList = Seq(("", PAlgo2.Params(2))),
-      servingParams = LServing1.Params(3))
-
-    val ep0 = baseEngineParams
-    val ep1 = baseEngineParams.copy(
-      algorithmParamsList = Seq(("", PAlgo2.Params(3))))
-    // ep2.dataSource is different from ep0.
-    val ep2 = baseEngineParams.copy(
-      dataSourceParams = ("", new PDataSource4.Params(id = 0, en = en, qn = qn)),
-      algorithmParamsList = Seq(("", PAlgo2.Params(3))))
-
-    val engineEvalDataSet = engine.batchEval(
-      sc,
-      Seq(ep0, ep1, ep2),
-      WorkflowParams())
-
-    val evalDataSet0 = engineEvalDataSet(0)._2
-    val evalDataSet1 = engineEvalDataSet(1)._2
-    val evalDataSet2 = engineEvalDataSet(2)._2
-
-    evalDataSet0 should not be evalDataSet1
-    evalDataSet0 should not be evalDataSet2
-    evalDataSet1 should not be evalDataSet2
-
-    // Set0 should have same EI as Set1, since their dsp are the same instance.
-    evalDataSet0.zip(evalDataSet1).foreach { case (e0, e1) => {
-      e0._1 should be theSameInstanceAs (e1._1)
-    }}
-  
-    // Set1 should have different EI as Set2, since Set2's dsp is another
-    // instance
-    evalDataSet1.zip(evalDataSet2).foreach { case (e1, e2) => {
-      e1._1 should not be theSameInstanceAs (e2._1)
-    }}
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/io/prediction/controller/MetricEvaluatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/prediction/controller/MetricEvaluatorTest.scala b/core/src/test/scala/io/prediction/controller/MetricEvaluatorTest.scala
deleted file mode 100644
index 71fcb88..0000000
--- a/core/src/test/scala/io/prediction/controller/MetricEvaluatorTest.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.controller
-
-import io.prediction.workflow.SharedSparkContext
-import io.prediction.workflow.WorkflowParams
-import org.scalatest.FunSuite
-
-object MetricEvaluatorSuite {
-  case class Metric0() extends SumMetric[EmptyParams, Int, Int, Int, Int] {
-    def calculate(q: Int, p: Int, a: Int): Int = q
-  }
-
-  object Evaluation0 extends Evaluation {}
-}
-
-class MetricEvaluatorDevSuite extends FunSuite with SharedSparkContext {
-  import io.prediction.controller.MetricEvaluatorSuite._
-
-  test("a") {
-    val metricEvaluator = MetricEvaluator(
-      Metric0(),
-      Seq(Metric0(), Metric0())
-    )
- 
-    val engineEvalDataSet = Seq(
-      (EngineParams(), Seq(
-        (EmptyParams(), sc.parallelize(Seq((1,0,0), (2,0,0)))))),
-      (EngineParams(), Seq(
-        (EmptyParams(), sc.parallelize(Seq((1,0,0), (2,0,0)))))))
-
-    val r = metricEvaluator.evaluateBase(
-      sc,
-      Evaluation0,
-      engineEvalDataSet,
-      WorkflowParams())
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/io/prediction/controller/MetricTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/prediction/controller/MetricTest.scala b/core/src/test/scala/io/prediction/controller/MetricTest.scala
deleted file mode 100644
index b846548..0000000
--- a/core/src/test/scala/io/prediction/controller/MetricTest.scala
+++ /dev/null
@@ -1,143 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.controller
-
-import io.prediction.workflow.SharedSparkContext
-
-import grizzled.slf4j.Logger
-import org.scalatest.Matchers._
-import org.scalatest.FunSuite
-import org.scalatest.Inside
-
-object MetricDevSuite {
-  class QIntSumMetric extends SumMetric[EmptyParams, Int, Int, Int, Int] {
-    def calculate(q: Int, p: Int, a: Int): Int = q
-  }
-  
-  class QDoubleSumMetric extends SumMetric[EmptyParams, Int, Int, Int, Double] {
-    def calculate(q: Int, p: Int, a: Int): Double = q.toDouble
-  }
-  
-  class QAverageMetric extends AverageMetric[EmptyParams, Int, Int, Int] {
-    def calculate(q: Int, p: Int, a: Int): Double = q.toDouble
-  }
-  
-  class QOptionAverageMetric extends OptionAverageMetric[EmptyParams, Int, Int, Int] {
-    def calculate(q: Int, p: Int, a: Int): Option[Double] = {
-      if (q < 0) { None } else { Some(q.toDouble) }
-    }
-  }
-  
-  class QStdevMetric extends StdevMetric[EmptyParams, Int, Int, Int] {
-    def calculate(q: Int, p: Int, a: Int): Double = q.toDouble
-  }
-  
-  class QOptionStdevMetric extends OptionStdevMetric[EmptyParams, Int, Int, Int] {
-    def calculate(q: Int, p: Int, a: Int): Option[Double] = {
-      if (q < 0) { None } else { Some(q.toDouble) }
-    }
-  }
-  
-}
-
-class MetricDevSuite
-extends FunSuite with Inside with SharedSparkContext {
-  @transient lazy val logger = Logger[this.type] 
-  
-  test("Average Metric") {
-    val qpaSeq0 = Seq((1, 0, 0), (2, 0, 0), (3, 0, 0))
-    val qpaSeq1 = Seq((4, 0, 0), (5, 0, 0), (6, 0, 0))
-
-    val evalDataSet = Seq(
-      (EmptyParams(), sc.parallelize(qpaSeq0)),
-      (EmptyParams(), sc.parallelize(qpaSeq1)))
-  
-    val m = new MetricDevSuite.QAverageMetric()
-    val result = m.calculate(sc, evalDataSet)
-    
-    result shouldBe (21.0 / 6)
-  }
-  
-  test("Option Average Metric") {
-    val qpaSeq0 = Seq((1, 0, 0), (2, 0, 0), (3, 0, 0))
-    val qpaSeq1 = Seq((-4, 0, 0), (-5, 0, 0), (6, 0, 0))
-
-    val evalDataSet = Seq(
-      (EmptyParams(), sc.parallelize(qpaSeq0)),
-      (EmptyParams(), sc.parallelize(qpaSeq1)))
-  
-    val m = new MetricDevSuite.QOptionAverageMetric()
-    val result = m.calculate(sc, evalDataSet)
-    
-    result shouldBe (12.0 / 4)
-  }
-  
-  test("Stdev Metric") {
-    val qpaSeq0 = Seq((1, 0, 0), (1, 0, 0), (1, 0, 0), (1, 0, 0))
-    val qpaSeq1 = Seq((5, 0, 0), (5, 0, 0), (5, 0, 0), (5, 0, 0))
-
-    val evalDataSet = Seq(
-      (EmptyParams(), sc.parallelize(qpaSeq0)),
-      (EmptyParams(), sc.parallelize(qpaSeq1)))
-  
-    val m = new MetricDevSuite.QStdevMetric()
-    val result = m.calculate(sc, evalDataSet)
-    
-    result shouldBe 2.0
-  }
-  
-  test("Option Stdev Metric") {
-    val qpaSeq0 = Seq((1, 0, 0), (1, 0, 0), (1, 0, 0), (1, 0, 0))
-    val qpaSeq1 = Seq((5, 0, 0), (5, 0, 0), (5, 0, 0), (5, 0, 0), (-5, 0, 0))
-
-    val evalDataSet = Seq(
-      (EmptyParams(), sc.parallelize(qpaSeq0)),
-      (EmptyParams(), sc.parallelize(qpaSeq1)))
-  
-    val m = new MetricDevSuite.QOptionStdevMetric()
-    val result = m.calculate(sc, evalDataSet)
-    
-    result shouldBe 2.0
-  }
-
-  test("Sum Metric [Int]") {
-    val qpaSeq0 = Seq((1, 0, 0), (2, 0, 0), (3, 0, 0))
-    val qpaSeq1 = Seq((4, 0, 0), (5, 0, 0), (6, 0, 0))
-
-    val evalDataSet = Seq(
-      (EmptyParams(), sc.parallelize(qpaSeq0)),
-      (EmptyParams(), sc.parallelize(qpaSeq1)))
-  
-    val m = new MetricDevSuite.QIntSumMetric()
-    val result = m.calculate(sc, evalDataSet)
-    
-    result shouldBe 21
-  }
-
-  test("Sum Metric [Double]") {
-    val qpaSeq0 = Seq((1, 0, 0), (2, 0, 0), (3, 0, 0))
-    val qpaSeq1 = Seq((4, 0, 0), (5, 0, 0), (6, 0, 0))
-
-    val evalDataSet = Seq(
-      (EmptyParams(), sc.parallelize(qpaSeq0)),
-      (EmptyParams(), sc.parallelize(qpaSeq1)))
-  
-    val m = new MetricDevSuite.QDoubleSumMetric()
-    val result = m.calculate(sc, evalDataSet)
-    
-    result shouldBe 21.0
-  }
-}