You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2016/07/18 20:17:53 UTC

[22/34] incubator-predictionio git commit: rename all except examples

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/Engine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/controller/Engine.scala b/core/src/main/scala/org/apache/predictionio/controller/Engine.scala
new file mode 100644
index 0000000..c875a9f
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/controller/Engine.scala
@@ -0,0 +1,829 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.controller
+
+import grizzled.slf4j.Logger
+import org.apache.predictionio.core.BaseAlgorithm
+import org.apache.predictionio.core.BaseDataSource
+import org.apache.predictionio.core.BaseEngine
+import org.apache.predictionio.core.BasePreparator
+import org.apache.predictionio.core.BaseServing
+import org.apache.predictionio.core.Doer
+import org.apache.predictionio.data.storage.EngineInstance
+import org.apache.predictionio.data.storage.StorageClientException
+import org.apache.predictionio.workflow.CreateWorkflow
+import org.apache.predictionio.workflow.EngineLanguage
+import org.apache.predictionio.workflow.JsonExtractorOption.JsonExtractorOption
+import org.apache.predictionio.workflow.NameParamsSerializer
+import org.apache.predictionio.workflow.PersistentModelManifest
+import org.apache.predictionio.workflow.SparkWorkflowUtils
+import org.apache.predictionio.workflow.StopAfterPrepareInterruption
+import org.apache.predictionio.workflow.StopAfterReadInterruption
+import org.apache.predictionio.workflow.WorkflowParams
+import org.apache.predictionio.workflow.WorkflowUtils
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+import org.json4s._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.read
+
+import scala.collection.JavaConversions
+import scala.language.implicitConversions
+
+/** This class chains up the entire data process. PredictionIO uses this
+  * information to create workflows and deployments. In Scala, you should
+  * implement an object that extends the [[EngineFactory]] trait similar to the
+  * following example.
+  *
+  * {{{
+  * object ItemRankEngine extends EngineFactory {
+  *   def apply() = {
+  *     new Engine(
+  *       classOf[ItemRankDataSource],
+  *       classOf[ItemRankPreparator],
+  *       Map(
+  *         "knn" -> classOf[KNNAlgorithm],
+  *         "rand" -> classOf[RandomAlgorithm],
+  *         "mahoutItemBased" -> classOf[MahoutItemBasedAlgorithm]),
+  *       classOf[ItemRankServing])
+  *   }
+  * }
+  * }}}
+  *
+  * @see [[EngineFactory]]
+  * @tparam TD Training data class.
+  * @tparam EI Evaluation info class.
+  * @tparam PD Prepared data class.
+  * @tparam Q Input query class.
+  * @tparam P Output prediction class.
+  * @tparam A Actual value class.
+  * @param dataSourceClassMap Map of data source names to class.
+  * @param preparatorClassMap Map of preparator names to class.
+  * @param algorithmClassMap Map of algorithm names to classes.
+  * @param servingClassMap Map of serving names to class.
+  * @group Engine
+  */
+class Engine[TD, EI, PD, Q, P, A](
+    val dataSourceClassMap: Map[String,
+      Class[_ <: BaseDataSource[TD, EI, Q, A]]],
+    val preparatorClassMap: Map[String, Class[_ <: BasePreparator[TD, PD]]],
+    val algorithmClassMap: Map[String, Class[_ <: BaseAlgorithm[PD, _, Q, P]]],
+    val servingClassMap: Map[String, Class[_ <: BaseServing[Q, P]]])
+  extends BaseEngine[EI, Q, P, A] {
+
+  private[prediction]
+  implicit lazy val formats = Utils.json4sDefaultFormats +
+    new NameParamsSerializer
+
+  @transient lazy protected val logger = Logger[this.type]
+
+  /** This auxiliary constructor is provided for backward compatibility.
+    *
+    * @param dataSourceClass Data source class.
+    * @param preparatorClass Preparator class.
+    * @param algorithmClassMap Map of algorithm names to classes.
+    * @param servingClass Serving class.
+    */
+  def this(
+    dataSourceClass: Class[_ <: BaseDataSource[TD, EI, Q, A]],
+    preparatorClass: Class[_ <: BasePreparator[TD, PD]],
+    algorithmClassMap: Map[String, Class[_ <: BaseAlgorithm[PD, _, Q, P]]],
+    servingClass: Class[_ <: BaseServing[Q, P]]) = this(
+      Map("" -> dataSourceClass),
+      Map("" -> preparatorClass),
+      algorithmClassMap,
+      Map("" -> servingClass)
+    )
+
+  /** Java-friendly constructor
+    *
+    * @param dataSourceClass Data source class.
+    * @param preparatorClass Preparator class.
+    * @param algorithmClassMap Map of algorithm names to classes.
+    * @param servingClass Serving class.
+    */
+  def this(dataSourceClass: Class[_ <: BaseDataSource[TD, EI, Q, A]],
+    preparatorClass: Class[_ <: BasePreparator[TD, PD]],
+    algorithmClassMap: _root_.java.util.Map[String, Class[_ <: BaseAlgorithm[PD, _, Q, P]]],
+    servingClass: Class[_ <: BaseServing[Q, P]]) = this(
+    Map("" -> dataSourceClass),
+    Map("" -> preparatorClass),
+    JavaConversions.mapAsScalaMap(algorithmClassMap).toMap,
+    Map("" -> servingClass)
+  )
+
+  /** Returns a new Engine instance, mimicking case class's copy method behavior.
+    */
+  def copy(
+    dataSourceClassMap: Map[String, Class[_ <: BaseDataSource[TD, EI, Q, A]]]
+      = dataSourceClassMap,
+    preparatorClassMap: Map[String, Class[_ <: BasePreparator[TD, PD]]]
+      = preparatorClassMap,
+    algorithmClassMap: Map[String, Class[_ <: BaseAlgorithm[PD, _, Q, P]]]
+      = algorithmClassMap,
+    servingClassMap: Map[String, Class[_ <: BaseServing[Q, P]]]
+      = servingClassMap): Engine[TD, EI, PD, Q, P, A] = {
+    new Engine(
+      dataSourceClassMap,
+      preparatorClassMap,
+      algorithmClassMap,
+      servingClassMap)
+  }
+
+  /** Training this engine would return a list of models.
+    *
+    * @param sc An instance of SparkContext.
+    * @param engineParams An instance of [[EngineParams]] for running a single training.
+    * @param params An instance of [[WorkflowParams]] that controls the workflow.
+    * @return A list of models.
+    */
+  def train(
+      sc: SparkContext,
+      engineParams: EngineParams,
+      engineInstanceId: String,
+      params: WorkflowParams): Seq[Any] = {
+    val (dataSourceName, dataSourceParams) = engineParams.dataSourceParams
+    val dataSource = Doer(dataSourceClassMap(dataSourceName), dataSourceParams)
+
+    val (preparatorName, preparatorParams) = engineParams.preparatorParams
+    val preparator = Doer(preparatorClassMap(preparatorName), preparatorParams)
+
+    val algoParamsList = engineParams.algorithmParamsList
+    require(
+      algoParamsList.size > 0,
+      "EngineParams.algorithmParamsList must have at least 1 element.")
+
+    val algorithms = algoParamsList.map { case (algoName, algoParams) =>
+      Doer(algorithmClassMap(algoName), algoParams)
+    }
+
+    val models = Engine.train(
+      sc, dataSource, preparator, algorithms, params)
+
+    val algoCount = algorithms.size
+    val algoTuples: Seq[(String, Params, BaseAlgorithm[_, _, _, _], Any)] =
+    (0 until algoCount).map { ax => {
+      // val (name, params) = algoParamsList(ax)
+      val (name, params) = algoParamsList(ax)
+      (name, params, algorithms(ax), models(ax))
+    }}
+
+    makeSerializableModels(
+      sc,
+      engineInstanceId = engineInstanceId,
+      algoTuples = algoTuples)
+  }
+
+  /** Algorithm models can be persisted before deploy. However, it is also
+    * possible that models are not persisted. This method retrains non-persisted
+    * models and return a list of models that can be used directly in deploy.
+    */
+  private[prediction]
+  def prepareDeploy(
+    sc: SparkContext,
+    engineParams: EngineParams,
+    engineInstanceId: String,
+    persistedModels: Seq[Any],
+    params: WorkflowParams): Seq[Any] = {
+
+    val algoParamsList = engineParams.algorithmParamsList
+    val algorithms = algoParamsList.map { case (algoName, algoParams) =>
+      Doer(algorithmClassMap(algoName), algoParams)
+    }
+
+    val models = if (persistedModels.exists(m => m.isInstanceOf[Unit.type])) {
+      // If any of persistedModels is Unit, we need to re-train the model.
+      logger.info("Some persisted models are Unit, need to re-train.")
+      val (dataSourceName, dataSourceParams) = engineParams.dataSourceParams
+      val dataSource = Doer(dataSourceClassMap(dataSourceName), dataSourceParams)
+
+      val (preparatorName, preparatorParams) = engineParams.preparatorParams
+      val preparator = Doer(preparatorClassMap(preparatorName), preparatorParams)
+
+      val td = dataSource.readTrainingBase(sc)
+      val pd = preparator.prepareBase(sc, td)
+
+      val models = algorithms.zip(persistedModels).map { case (algo, m) =>
+        m match {
+          case Unit => algo.trainBase(sc, pd)
+          case _ => m
+        }
+      }
+      models
+    } else {
+      logger.info("Using persisted model")
+      persistedModels
+    }
+
+    models
+    .zip(algorithms)
+    .zip(algoParamsList)
+    .zipWithIndex
+    .map {
+      case (((model, algo), (algoName, algoParams)), ax) => {
+        model match {
+          case modelManifest: PersistentModelManifest => {
+            logger.info("Custom-persisted model detected for algorithm " +
+              algo.getClass.getName)
+            SparkWorkflowUtils.getPersistentModel(
+              modelManifest,
+              Seq(engineInstanceId, ax, algoName).mkString("-"),
+              algoParams,
+              Some(sc),
+              getClass.getClassLoader)
+          }
+          case m => {
+            try {
+              logger.info(
+                s"Loaded model ${m.getClass.getName} for algorithm " +
+                s"${algo.getClass.getName}")
+              sc.stop
+              m
+            } catch {
+              case e: NullPointerException =>
+                logger.warn(
+                  s"Null model detected for algorithm ${algo.getClass.getName}")
+                m
+            }
+          }
+        }  // model match
+      }
+    }
+  }
+
+  /** Extract model for persistent layer.
+    *
+    * PredictionIO presist models for future use. It allows custom
+    * implementation for persisting models. You need to implement the
+    * [[org.apache.predictionio.controller.PersistentModel]] interface. This method
+    * traverses all models in the workflow. If the model is a
+    * [[org.apache.predictionio.controller.PersistentModel]], it calls the save method
+    * for custom persistence logic.
+    *
+    * For model doesn't support custom logic, PredictionIO serializes the whole
+    * model if the corresponding algorithm is local. On the other hand, if the
+    * model is parallel (i.e. model associated with a number of huge RDDS), this
+    * method return Unit, in which case PredictionIO will retrain the whole
+    * model from scratch next time it is used.
+    */
+  private def makeSerializableModels(
+    sc: SparkContext,
+    engineInstanceId: String,
+    // AlgoName, Algo, Model
+    algoTuples: Seq[(String, Params, BaseAlgorithm[_, _, _, _], Any)]
+  ): Seq[Any] = {
+
+    logger.info(s"engineInstanceId=$engineInstanceId")
+
+    algoTuples
+    .zipWithIndex
+    .map { case ((name, params, algo, model), ax) =>
+      algo.makePersistentModel(
+        sc = sc,
+        modelId = Seq(engineInstanceId, ax, name).mkString("-"),
+        algoParams = params,
+        bm = model)
+    }
+  }
+
+  /** This is implemented such that [[org.apache.predictionio.controller.Evaluation]] can
+    * use this method to generate inputs for [[org.apache.predictionio.controller.Metric]].
+    *
+    * @param sc An instance of SparkContext.
+    * @param engineParams An instance of [[EngineParams]] for running a single evaluation.
+    * @param params An instance of [[WorkflowParams]] that controls the workflow.
+    * @return A list of evaluation information and RDD of query, predicted
+    *         result, and actual result tuple tuple.
+    */
+  def eval(
+    sc: SparkContext,
+    engineParams: EngineParams,
+    params: WorkflowParams)
+  : Seq[(EI, RDD[(Q, P, A)])] = {
+    val (dataSourceName, dataSourceParams) = engineParams.dataSourceParams
+    val dataSource = Doer(dataSourceClassMap(dataSourceName), dataSourceParams)
+
+    val (preparatorName, preparatorParams) = engineParams.preparatorParams
+    val preparator = Doer(preparatorClassMap(preparatorName), preparatorParams)
+
+    val algoParamsList = engineParams.algorithmParamsList
+    require(
+      algoParamsList.size > 0,
+      "EngineParams.algorithmParamsList must have at least 1 element.")
+
+    val algorithms = algoParamsList.map { case (algoName, algoParams) => {
+      try {
+        Doer(algorithmClassMap(algoName), algoParams)
+      } catch {
+        case e: NoSuchElementException => {
+          if (algoName == "") {
+            logger.error("Empty algorithm name supplied but it could not " +
+              "match with any algorithm in the engine's definition. " +
+              "Existing algorithm name(s) are: " +
+              s"${algorithmClassMap.keys.mkString(", ")}. Aborting.")
+          } else {
+            logger.error(s"$algoName cannot be found in the engine's " +
+              "definition. Existing algorithm name(s) are: " +
+              s"${algorithmClassMap.keys.mkString(", ")}. Aborting.")
+          }
+          sys.exit(1)
+        }
+      }
+    }}
+
+    val (servingName, servingParams) = engineParams.servingParams
+    val serving = Doer(servingClassMap(servingName), servingParams)
+
+    Engine.eval(sc, dataSource, preparator, algorithms, serving)
+  }
+
+  override def jValueToEngineParams(
+    variantJson: JValue,
+    jsonExtractor: JsonExtractorOption): EngineParams = {
+
+    val engineLanguage = EngineLanguage.Scala
+    // Extract EngineParams
+    logger.info(s"Extracting datasource params...")
+    val dataSourceParams: (String, Params) =
+      WorkflowUtils.getParamsFromJsonByFieldAndClass(
+        variantJson,
+        "datasource",
+        dataSourceClassMap,
+        engineLanguage,
+        jsonExtractor)
+    logger.info(s"Datasource params: $dataSourceParams")
+
+    logger.info(s"Extracting preparator params...")
+    val preparatorParams: (String, Params) =
+      WorkflowUtils.getParamsFromJsonByFieldAndClass(
+        variantJson,
+        "preparator",
+        preparatorClassMap,
+        engineLanguage,
+        jsonExtractor)
+    logger.info(s"Preparator params: $preparatorParams")
+
+    val algorithmsParams: Seq[(String, Params)] =
+      variantJson findField {
+        case JField("algorithms", _) => true
+        case _ => false
+      } map { jv =>
+        val algorithmsParamsJson = jv._2
+        algorithmsParamsJson match {
+          case JArray(s) => s.map { algorithmParamsJValue =>
+            val eap = algorithmParamsJValue.extract[CreateWorkflow.AlgorithmParams]
+            (
+              eap.name,
+              WorkflowUtils.extractParams(
+                engineLanguage,
+                compact(render(eap.params)),
+                algorithmClassMap(eap.name),
+                jsonExtractor)
+            )
+          }
+          case _ => Nil
+        }
+      } getOrElse Seq(("", EmptyParams()))
+
+    logger.info(s"Extracting serving params...")
+    val servingParams: (String, Params) =
+      WorkflowUtils.getParamsFromJsonByFieldAndClass(
+        variantJson,
+        "serving",
+        servingClassMap,
+        engineLanguage,
+        jsonExtractor)
+    logger.info(s"Serving params: $servingParams")
+
+    new EngineParams(
+      dataSourceParams = dataSourceParams,
+      preparatorParams = preparatorParams,
+      algorithmParamsList = algorithmsParams,
+      servingParams = servingParams)
+  }
+
+  private[prediction] def engineInstanceToEngineParams(
+    engineInstance: EngineInstance,
+    jsonExtractor: JsonExtractorOption): EngineParams = {
+
+    implicit val formats = DefaultFormats
+    val engineLanguage = EngineLanguage.Scala
+
+    val dataSourceParamsWithName: (String, Params) = {
+      val (name, params) =
+        read[(String, JValue)](engineInstance.dataSourceParams)
+      if (!dataSourceClassMap.contains(name)) {
+        logger.error(s"Unable to find datasource class with name '$name'" +
+          " defined in Engine.")
+        sys.exit(1)
+      }
+      val extractedParams = WorkflowUtils.extractParams(
+        engineLanguage,
+        compact(render(params)),
+        dataSourceClassMap(name),
+        jsonExtractor)
+      (name, extractedParams)
+    }
+
+    val preparatorParamsWithName: (String, Params) = {
+      val (name, params) =
+        read[(String, JValue)](engineInstance.preparatorParams)
+      if (!preparatorClassMap.contains(name)) {
+        logger.error(s"Unable to find preparator class with name '$name'" +
+          " defined in Engine.")
+        sys.exit(1)
+      }
+      val extractedParams = WorkflowUtils.extractParams(
+        engineLanguage,
+        compact(render(params)),
+        preparatorClassMap(name),
+        jsonExtractor)
+      (name, extractedParams)
+    }
+
+    val algorithmsParamsWithNames =
+      read[Seq[(String, JValue)]](engineInstance.algorithmsParams).map {
+        case (algoName, params) =>
+          val extractedParams = WorkflowUtils.extractParams(
+            engineLanguage,
+            compact(render(params)),
+            algorithmClassMap(algoName),
+            jsonExtractor)
+          (algoName, extractedParams)
+      }
+
+    val servingParamsWithName: (String, Params) = {
+      val (name, params) = read[(String, JValue)](engineInstance.servingParams)
+      if (!servingClassMap.contains(name)) {
+        logger.error(s"Unable to find serving class with name '$name'" +
+          " defined in Engine.")
+        sys.exit(1)
+      }
+      val extractedParams = WorkflowUtils.extractParams(
+        engineLanguage,
+        compact(render(params)),
+        servingClassMap(name),
+        jsonExtractor)
+      (name, extractedParams)
+    }
+
+    new EngineParams(
+      dataSourceParams = dataSourceParamsWithName,
+      preparatorParams = preparatorParamsWithName,
+      algorithmParamsList = algorithmsParamsWithNames,
+      servingParams = servingParamsWithName)
+  }
+}
+
+/** This object contains concrete implementation for some methods of the
+  * [[Engine]] class.
+  *
+  * @group Engine
+  */
+object Engine {
+  private type EX = Int
+  private type AX = Int
+  private type QX = Long
+
+  @transient lazy private val logger = Logger[this.type]
+
+  /** Helper class to accept either a single data source, or a map of data
+    * sources, with a companion object providing implicit conversions, so
+    * using this class directly is not necessary.
+    *
+    * @tparam TD Training data class
+    * @tparam EI Evaluation information class
+    * @tparam Q Input query class
+    * @tparam A Actual result class
+    */
+  class DataSourceMap[TD, EI, Q, A](
+    val m: Map[String, Class[_ <: BaseDataSource[TD, EI, Q, A]]]) {
+    def this(c: Class[_ <: BaseDataSource[TD, EI, Q, A]]) = this(Map("" -> c))
+  }
+
+  /** Companion object providing implicit conversions, so using this directly
+    * is not necessary.
+    */
+  object DataSourceMap {
+    implicit def cToMap[TD, EI, Q, A](
+      c: Class[_ <: BaseDataSource[TD, EI, Q, A]]):
+      DataSourceMap[TD, EI, Q, A] = new DataSourceMap(c)
+    implicit def mToMap[TD, EI, Q, A](
+      m: Map[String, Class[_ <: BaseDataSource[TD, EI, Q, A]]]):
+      DataSourceMap[TD, EI, Q, A] = new DataSourceMap(m)
+  }
+
+  /** Helper class to accept either a single preparator, or a map of
+    * preparators, with a companion object providing implicit conversions, so
+    * using this class directly is not necessary.
+    *
+    * @tparam TD Training data class
+    * @tparam PD Prepared data class
+    */
+  class PreparatorMap[TD, PD](
+    val m: Map[String, Class[_ <: BasePreparator[TD, PD]]]) {
+    def this(c: Class[_ <: BasePreparator[TD, PD]]) = this(Map("" -> c))
+  }
+
+  /** Companion object providing implicit conversions, so using this directly
+    * is not necessary.
+    */
+  object PreparatorMap {
+    implicit def cToMap[TD, PD](
+      c: Class[_ <: BasePreparator[TD, PD]]):
+      PreparatorMap[TD, PD] = new PreparatorMap(c)
+    implicit def mToMap[TD, PD](
+      m: Map[String, Class[_ <: BasePreparator[TD, PD]]]):
+      PreparatorMap[TD, PD] = new PreparatorMap(m)
+  }
+
+  /** Helper class to accept either a single serving, or a map of serving, with
+    * a companion object providing implicit conversions, so using this class
+    * directly is not necessary.
+    *
+    * @tparam Q Input query class
+    * @tparam P Predicted result class
+    */
+  class ServingMap[Q, P](
+    val m: Map[String, Class[_ <: BaseServing[Q, P]]]) {
+    def this(c: Class[_ <: BaseServing[Q, P]]) = this(Map("" -> c))
+  }
+
+  /** Companion object providing implicit conversions, so using this directly
+    * is not necessary.
+    */
+  object ServingMap {
+    implicit def cToMap[Q, P](
+      c: Class[_ <: BaseServing[Q, P]]): ServingMap[Q, P] =
+        new ServingMap(c)
+    implicit def mToMap[Q, P](
+      m: Map[String, Class[_ <: BaseServing[Q, P]]]): ServingMap[Q, P] =
+        new ServingMap(m)
+  }
+
+  /** Convenient method for returning an instance of [[Engine]].
+    *
+    * @param dataSourceMap Accepts either an instance of Class of the data
+    *                      source, or a Map of data source classes (implicitly
+    *                      converted to [[DataSourceMap]].
+    * @param preparatorMap Accepts either an instance of Class of the
+    *                      preparator, or a Map of preparator classes
+    *                      (implicitly converted to [[PreparatorMap]].
+    * @param algorithmClassMap Accepts a Map of algorithm classes.
+    * @param servingMap Accepts either an instance of Class of the serving, or
+    *                   a Map of serving classes (implicitly converted to
+    *                   [[ServingMap]].
+    * @tparam TD Training data class
+    * @tparam EI Evaluation information class
+    * @tparam PD Prepared data class
+    * @tparam Q Input query class
+    * @tparam P Predicted result class
+    * @tparam A Actual result class
+    * @return An instance of [[Engine]]
+    */
+  def apply[TD, EI, PD, Q, P, A](
+    dataSourceMap: DataSourceMap[TD, EI, Q, A],
+    preparatorMap: PreparatorMap[TD, PD],
+    algorithmClassMap: Map[String, Class[_ <: BaseAlgorithm[PD, _, Q, P]]],
+    servingMap: ServingMap[Q, P]): Engine[TD, EI, PD, Q, P, A] = new Engine(
+      dataSourceMap.m,
+      preparatorMap.m,
+      algorithmClassMap,
+      servingMap.m
+    )
+
+  /** Provides concrete implementation of training for [[Engine]].
+    *
+    * @param sc An instance of SparkContext
+    * @param dataSource An instance of data source
+    * @param preparator An instance of preparator
+    * @param algorithmList A list of algorithm instances
+    * @param params An instance of [[WorkflowParams]] that controls the training
+    *               process.
+    * @tparam TD Training data class
+    * @tparam PD Prepared data class
+    * @tparam Q Input query class
+    * @return A list of trained models
+    */
+  def train[TD, PD, Q](
+      sc: SparkContext,
+      dataSource: BaseDataSource[TD, _, Q, _],
+      preparator: BasePreparator[TD, PD],
+      algorithmList: Seq[BaseAlgorithm[PD, _, Q, _]],
+      params: WorkflowParams
+    ): Seq[Any] = {
+    logger.info("EngineWorkflow.train")
+    logger.info(s"DataSource: $dataSource")
+    logger.info(s"Preparator: $preparator")
+    logger.info(s"AlgorithmList: $algorithmList")
+
+    if (params.skipSanityCheck) {
+      logger.info("Data sanity check is off.")
+    } else {
+      logger.info("Data sanity check is on.")
+    }
+
+    val td = try {
+      dataSource.readTrainingBase(sc)
+    } catch {
+      case e: StorageClientException =>
+        logger.error(s"Error occured reading from data source. (Reason: " +
+          e.getMessage + ") Please see the log for debugging details.", e)
+        sys.exit(1)
+    }
+
+    if (!params.skipSanityCheck) {
+      td match {
+        case sanityCheckable: SanityCheck => {
+          logger.info(s"${td.getClass.getName} supports data sanity" +
+            " check. Performing check.")
+          sanityCheckable.sanityCheck()
+        }
+        case _ => {
+          logger.info(s"${td.getClass.getName} does not support" +
+            " data sanity check. Skipping check.")
+        }
+      }
+    }
+
+    if (params.stopAfterRead) {
+      logger.info("Stopping here because --stop-after-read is set.")
+      throw StopAfterReadInterruption()
+    }
+
+    val pd = preparator.prepareBase(sc, td)
+
+    if (!params.skipSanityCheck) {
+      pd match {
+        case sanityCheckable: SanityCheck => {
+          logger.info(s"${pd.getClass.getName} supports data sanity" +
+            " check. Performing check.")
+          sanityCheckable.sanityCheck()
+        }
+        case _ => {
+          logger.info(s"${pd.getClass.getName} does not support" +
+            " data sanity check. Skipping check.")
+        }
+      }
+    }
+
+    if (params.stopAfterPrepare) {
+      logger.info("Stopping here because --stop-after-prepare is set.")
+      throw StopAfterPrepareInterruption()
+    }
+
+    val models: Seq[Any] = algorithmList.map(_.trainBase(sc, pd))
+
+    if (!params.skipSanityCheck) {
+      models.foreach { model => {
+        model match {
+          case sanityCheckable: SanityCheck => {
+            logger.info(s"${model.getClass.getName} supports data sanity" +
+              " check. Performing check.")
+            sanityCheckable.sanityCheck()
+          }
+          case _ => {
+            logger.info(s"${model.getClass.getName} does not support" +
+              " data sanity check. Skipping check.")
+          }
+        }
+      }}
+    }
+
+    logger.info("EngineWorkflow.train completed")
+    models
+  }
+
+  /** Provides concrete implementation of evaluation for [[Engine]].
+    *
+    * @param sc An instance of SparkContext
+    * @param dataSource An instance of data source
+    * @param preparator An instance of preparator
+    * @param algorithmList A list of algorithm instances
+    * @param serving An instance of serving
+    * @tparam TD Training data class
+    * @tparam PD Prepared data class
+    * @tparam Q Input query class
+    * @tparam P Predicted result class
+    * @tparam A Actual result class
+    * @tparam EI Evaluation information class
+    * @return A list of evaluation information, RDD of query, predicted result,
+    *         and actual result tuple tuple.
+    */
+  def eval[TD, PD, Q, P, A, EI](
+      sc: SparkContext,
+      dataSource: BaseDataSource[TD, EI, Q, A],
+      preparator: BasePreparator[TD, PD],
+      algorithmList: Seq[BaseAlgorithm[PD, _, Q, P]],
+      serving: BaseServing[Q, P]): Seq[(EI, RDD[(Q, P, A)])] = {
+    logger.info(s"DataSource: $dataSource")
+    logger.info(s"Preparator: $preparator")
+    logger.info(s"AlgorithmList: $algorithmList")
+    logger.info(s"Serving: $serving")
+
+    val algoMap: Map[AX, BaseAlgorithm[PD, _, Q, P]] = algorithmList
+      .zipWithIndex
+      .map(_.swap)
+      .toMap
+    val algoCount = algoMap.size
+
+    val evalTupleMap: Map[EX, (TD, EI, RDD[(Q, A)])] = dataSource
+      .readEvalBase(sc)
+      .zipWithIndex
+      .map(_.swap)
+      .toMap
+
+    val evalCount = evalTupleMap.size
+
+    val evalTrainMap: Map[EX, TD] = evalTupleMap.mapValues(_._1)
+    val evalInfoMap: Map[EX, EI] = evalTupleMap.mapValues(_._2)
+    val evalQAsMap: Map[EX, RDD[(QX, (Q, A))]] = evalTupleMap
+      .mapValues(_._3)
+      .mapValues{ _.zipWithUniqueId().map(_.swap) }
+
+    val preparedMap: Map[EX, PD] = evalTrainMap.mapValues { td => {
+      preparator.prepareBase(sc, td)
+    }}
+
+    val algoModelsMap: Map[EX, Map[AX, Any]] = preparedMap.mapValues { pd => {
+      algoMap.mapValues(_.trainBase(sc,pd))
+    }}
+
+    val suppQAsMap: Map[EX, RDD[(QX, (Q, A))]] = evalQAsMap.mapValues { qas =>
+      qas.map { case (qx, (q, a)) => (qx, (serving.supplementBase(q), a)) }
+    }
+
+    val algoPredictsMap: Map[EX, RDD[(QX, Seq[P])]] = (0 until evalCount)
+    .map { ex => {
+      val modelMap: Map[AX, Any] = algoModelsMap(ex)
+
+      val qs: RDD[(QX, Q)] = suppQAsMap(ex).mapValues(_._1)
+
+      val algoPredicts: Seq[RDD[(QX, (AX, P))]] = (0 until algoCount)
+      .map { ax => {
+        val algo = algoMap(ax)
+        val model = modelMap(ax)
+        val rawPredicts: RDD[(QX, P)] = algo.batchPredictBase(sc, model, qs)
+        val predicts: RDD[(QX, (AX, P))] = rawPredicts.map { case (qx, p) => {
+          (qx, (ax, p))
+        }}
+        predicts
+      }}
+
+      val unionAlgoPredicts: RDD[(QX, Seq[P])] = sc.union(algoPredicts)
+      .groupByKey()
+      .mapValues { ps => {
+        assert (ps.size == algoCount, "Must have same length as algoCount")
+        // TODO. Check size == algoCount
+        ps.toSeq.sortBy(_._1).map(_._2)
+      }}
+
+      (ex, unionAlgoPredicts)
+    }}
+    .toMap
+
+    val servingQPAMap: Map[EX, RDD[(Q, P, A)]] = algoPredictsMap
+    .map { case (ex, psMap) => {
+      // The query passed to serving.serve is the original one, not
+      // supplemented.
+      val qasMap: RDD[(QX, (Q, A))] = evalQAsMap(ex)
+      val qpsaMap: RDD[(QX, Q, Seq[P], A)] = psMap.join(qasMap)
+      .map { case (qx, t) => (qx, t._2._1, t._1, t._2._2) }
+
+      val qpaMap: RDD[(Q, P, A)] = qpsaMap.map {
+        case (qx, q, ps, a) => (q, serving.serveBase(q, ps), a)
+      }
+      (ex, qpaMap)
+    }}
+
+    (0 until evalCount).map { ex => {
+      (evalInfoMap(ex), servingQPAMap(ex))
+    }}
+    .toSeq
+  }
+}
+
+/** Mix in this trait for queries that contain prId (PredictedResultId).
+  * This is useful when your engine expects queries to also be associated with
+  * prId keys when feedback loop is enabled.
+  *
+  * @group Helper
+  */
+@deprecated("To be removed in future releases.", "0.9.2")
+trait WithPrId {
+  val prId: String = ""
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/EngineFactory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/controller/EngineFactory.scala b/core/src/main/scala/org/apache/predictionio/controller/EngineFactory.scala
new file mode 100644
index 0000000..e9db35b
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/controller/EngineFactory.scala
@@ -0,0 +1,41 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.controller
+
+import org.apache.predictionio.core.BaseEngine
+
+import scala.language.implicitConversions
+
+/** If you intend to let PredictionIO create workflow and deploy serving
+  * automatically, you will need to implement an object that extends this class
+  * and return an [[Engine]].
+  *
+  * @group Engine
+  */
+abstract class EngineFactory {
+  /** Creates an instance of an [[Engine]]. */
+  def apply(): BaseEngine[_, _, _, _]
+
+  /** Override this method to programmatically return engine parameters. */
+  def engineParams(key: String): EngineParams = EngineParams()
+}
+
+/** DEPRECATED. Use [[EngineFactory]] instead.
+  *
+  * @group Engine
+  */
+@deprecated("Use EngineFactory instead.", "0.9.2")
+trait IEngineFactory extends EngineFactory

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/EngineParams.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/controller/EngineParams.scala b/core/src/main/scala/org/apache/predictionio/controller/EngineParams.scala
new file mode 100644
index 0000000..b419255
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/controller/EngineParams.scala
@@ -0,0 +1,149 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.controller
+
+import org.apache.predictionio.core.BaseDataSource
+import org.apache.predictionio.core.BaseAlgorithm
+
+import scala.collection.JavaConversions
+import scala.language.implicitConversions
+
+/** This class serves as a logical grouping of all required engine's parameters.
+  *
+  * @param dataSourceParams Data Source name-parameters tuple.
+  * @param preparatorParams Preparator name-parameters tuple.
+  * @param algorithmParamsList List of algorithm name-parameter pairs.
+  * @param servingParams Serving name-parameters tuple.
+  * @group Engine
+  */
+class EngineParams(
+    val dataSourceParams: (String, Params) = ("", EmptyParams()),
+    val preparatorParams: (String, Params) = ("", EmptyParams()),
+    val algorithmParamsList: Seq[(String, Params)] = Seq(),
+    val servingParams: (String, Params) = ("", EmptyParams()))
+  extends Serializable {
+
+  /** Java-friendly constructor
+    *
+    * @param dataSourceName Data Source name
+    * @param dataSourceParams Data Source parameters
+    * @param preparatorName Preparator name
+    * @param preparatorParams Preparator parameters
+    * @param algorithmParamsList Map of algorithm name-parameters
+    * @param servingName Serving name
+    * @param servingParams Serving parameters
+    */
+  def this(
+    dataSourceName: String,
+    dataSourceParams: Params,
+    preparatorName: String,
+    preparatorParams: Params,
+    algorithmParamsList: _root_.java.util.Map[String, _ <: Params],
+    servingName: String,
+    servingParams: Params) = {
+
+    // To work around a json4s weird limitation, the parameter names can not be changed
+    this(
+      (dataSourceName, dataSourceParams),
+      (preparatorName, preparatorParams),
+      JavaConversions.mapAsScalaMap(algorithmParamsList).toSeq,
+      (servingName, servingParams)
+    )
+  }
+
+  // A case class style copy method.
+  def copy(
+    dataSourceParams: (String, Params) = dataSourceParams,
+    preparatorParams: (String, Params) = preparatorParams,
+    algorithmParamsList: Seq[(String, Params)] = algorithmParamsList,
+    servingParams: (String, Params) = servingParams): EngineParams = {
+
+    new EngineParams(
+      dataSourceParams,
+      preparatorParams,
+      algorithmParamsList,
+      servingParams)
+  }
+}
+
+/** Companion object for creating [[EngineParams]] instances.
+  *
+  * @group Engine
+  */
+object EngineParams {
+  /** Create EngineParams.
+    *
+    * @param dataSourceName Data Source name
+    * @param dataSourceParams Data Source parameters
+    * @param preparatorName Preparator name
+    * @param preparatorParams Preparator parameters
+    * @param algorithmParamsList List of algorithm name-parameter pairs.
+    * @param servingName Serving name
+    * @param servingParams Serving parameters
+    */
+  def apply(
+    dataSourceName: String = "",
+    dataSourceParams: Params = EmptyParams(),
+    preparatorName: String = "",
+    preparatorParams: Params = EmptyParams(),
+    algorithmParamsList: Seq[(String, Params)] = Seq(),
+    servingName: String = "",
+    servingParams: Params = EmptyParams()): EngineParams = {
+      new EngineParams(
+        dataSourceParams = (dataSourceName, dataSourceParams),
+        preparatorParams = (preparatorName, preparatorParams),
+        algorithmParamsList = algorithmParamsList,
+        servingParams = (servingName, servingParams)
+      )
+    }
+}
+
+/** SimpleEngine has only one algorithm, and uses default preparator and serving
+  * layer. Current default preparator is `IdentityPreparator` and serving is
+  * `FirstServing`.
+  *
+  * @tparam TD Training data class.
+  * @tparam EI Evaluation info class.
+  * @tparam Q Input query class.
+  * @tparam P Output prediction class.
+  * @tparam A Actual value class.
+  * @param dataSourceClass Data source class.
+  * @param algorithmClass of algorithm names to classes.
+  * @group Engine
+  */
+class SimpleEngine[TD, EI, Q, P, A](
+    dataSourceClass: Class[_ <: BaseDataSource[TD, EI, Q, A]],
+    algorithmClass: Class[_ <: BaseAlgorithm[TD, _, Q, P]])
+  extends Engine(
+    dataSourceClass,
+    IdentityPreparator(dataSourceClass),
+    Map("" -> algorithmClass),
+    LFirstServing(algorithmClass))
+
+/** This shorthand class serves the `SimpleEngine` class.
+  *
+  * @param dataSourceParams Data source parameters.
+  * @param algorithmParams List of algorithm name-parameter pairs.
+  * @group Engine
+  */
+class SimpleEngineParams(
+    dataSourceParams: Params = EmptyParams(),
+    algorithmParams: Params = EmptyParams())
+  extends EngineParams(
+    dataSourceParams = ("", dataSourceParams),
+    algorithmParamsList = Seq(("", algorithmParams)))
+
+

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/EngineParamsGenerator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/controller/EngineParamsGenerator.scala b/core/src/main/scala/org/apache/predictionio/controller/EngineParamsGenerator.scala
new file mode 100644
index 0000000..2e26b83
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/controller/EngineParamsGenerator.scala
@@ -0,0 +1,43 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.controller
+
+import scala.language.implicitConversions
+
+/** Defines an engine parameters generator.
+  *
+  * Implementations of this trait can be supplied to "pio eval" as the second
+  * command line argument.
+  *
+  * @group Evaluation
+  */
+trait EngineParamsGenerator {
+  protected[this] var epList: Seq[EngineParams] = _
+  protected[this] var epListSet: Boolean = false
+
+  /** Returns the list of [[EngineParams]] of this [[EngineParamsGenerator]]. */
+  def engineParamsList: Seq[EngineParams] = {
+    assert(epListSet, "EngineParamsList not set")
+    epList
+  }
+
+  /** Sets the list of [[EngineParams]] of this [[EngineParamsGenerator]]. */
+  def engineParamsList_=(l: Seq[EngineParams]) {
+    assert(!epListSet, "EngineParamsList can bet set at most once")
+    epList = Seq(l:_*)
+    epListSet = true
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/Evaluation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/controller/Evaluation.scala b/core/src/main/scala/org/apache/predictionio/controller/Evaluation.scala
new file mode 100644
index 0000000..c720c4f
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/controller/Evaluation.scala
@@ -0,0 +1,122 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.controller
+
+import org.apache.predictionio.core.BaseEngine
+import org.apache.predictionio.core.BaseEvaluator
+import org.apache.predictionio.core.BaseEvaluatorResult
+
+import scala.language.implicitConversions
+
+/** Defines an evaluation that contains an engine and a metric.
+  *
+  * Implementations of this trait can be supplied to "pio eval" as the first
+  * argument.
+  *
+  * @group Evaluation
+  */
+trait Evaluation extends Deployment {
+  protected [this] var _evaluatorSet: Boolean = false
+  protected [this] var _evaluator: BaseEvaluator[_, _, _, _, _ <: BaseEvaluatorResult] = _
+
+  private [prediction]
+  def evaluator: BaseEvaluator[_, _, _, _, _ <: BaseEvaluatorResult] = {
+    assert(_evaluatorSet, "Evaluator not set")
+    _evaluator
+  }
+
+  /** Gets the tuple of the [[Engine]] and the implementation of
+    * [[org.apache.predictionio.core.BaseEvaluator]]
+    */
+  def engineEvaluator
+  : (BaseEngine[_, _, _, _], BaseEvaluator[_, _, _, _, _]) = {
+    assert(_evaluatorSet, "Evaluator not set")
+    (engine, _evaluator)
+  }
+
+  /** Sets both an [[Engine]] and an implementation of
+    * [[org.apache.predictionio.core.BaseEvaluator]] for this [[Evaluation]]
+    *
+    * @param engineEvaluator A tuple an [[Engine]] and an implementation of
+    *                        [[org.apache.predictionio.core.BaseEvaluator]]
+    * @tparam EI Evaluation information class
+    * @tparam Q Query class
+    * @tparam P Predicted result class
+    * @tparam A Actual result class
+    * @tparam R Metric result class
+    */
+  def engineEvaluator_=[EI, Q, P, A, R <: BaseEvaluatorResult](
+    engineEvaluator: (
+      BaseEngine[EI, Q, P, A],
+      BaseEvaluator[EI, Q, P, A, R])) {
+    assert(!_evaluatorSet, "Evaluator can be set at most once")
+    engine = engineEvaluator._1
+    _evaluator = engineEvaluator._2
+    _evaluatorSet = true
+  }
+
+  /** Returns both the [[Engine]] and the implementation of [[Metric]] for this
+    * [[Evaluation]]
+    */
+  def engineMetric: (BaseEngine[_, _, _, _], Metric[_, _, _, _, _]) = {
+    throw new NotImplementedError("This method is to keep the compiler happy")
+  }
+
+  /** Sets both an [[Engine]] and an implementation of [[Metric]] for this
+    * [[Evaluation]]
+    *
+    * @param engineMetric A tuple of [[Engine]] and an implementation of
+    *                     [[Metric]]
+    * @tparam EI Evaluation information class
+    * @tparam Q Query class
+    * @tparam P Predicted result class
+    * @tparam A Actual result class
+    */
+  def engineMetric_=[EI, Q, P, A](
+    engineMetric: (BaseEngine[EI, Q, P, A], Metric[EI, Q, P, A, _])) {
+    engineEvaluator = (
+      engineMetric._1,
+      MetricEvaluator(
+        metric = engineMetric._2,
+        otherMetrics = Seq[Metric[EI, Q, P, A, _]](),
+        outputPath = "best.json"))
+  }
+
+  private [prediction]
+  def engineMetrics: (BaseEngine[_, _, _, _], Metric[_, _, _, _, _]) = {
+    throw new NotImplementedError("This method is to keep the compiler happy")
+  }
+
+  /** Sets an [[Engine]], an implementation of [[Metric]], and sequence of
+    * implementations of [[Metric]] for this [[Evaluation]]
+    *
+    * @param engineMetrics A tuple of [[Engine]], an implementation of
+    *                      [[Metric]] and sequence of implementations of [[Metric]]
+    * @tparam EI Evaluation information class
+    * @tparam Q Query class
+    * @tparam P Predicted result class
+    * @tparam A Actual result class
+    */
+  def engineMetrics_=[EI, Q, P, A](
+    engineMetrics: (
+      BaseEngine[EI, Q, P, A],
+      Metric[EI, Q, P, A, _],
+      Seq[Metric[EI, Q, P, A, _]])) {
+    engineEvaluator = (
+      engineMetrics._1,
+      MetricEvaluator(engineMetrics._2, engineMetrics._3))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/FastEvalEngine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/controller/FastEvalEngine.scala b/core/src/main/scala/org/apache/predictionio/controller/FastEvalEngine.scala
new file mode 100644
index 0000000..868d818
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/controller/FastEvalEngine.scala
@@ -0,0 +1,343 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.controller
+
+import org.apache.predictionio.core.BaseDataSource
+import org.apache.predictionio.core.BasePreparator
+import org.apache.predictionio.core.BaseAlgorithm
+import org.apache.predictionio.core.BaseServing
+import org.apache.predictionio.core.Doer
+import org.apache.predictionio.annotation.Experimental
+
+import grizzled.slf4j.Logger
+import org.apache.predictionio.workflow.WorkflowParams
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+import scala.language.implicitConversions
+
+import _root_.java.util.NoSuchElementException
+
+import scala.collection.mutable.{ HashMap => MutableHashMap }
+
+/** :: Experimental ::
+  * Workflow based on [[FastEvalEngine]]
+  *
+  * @group Evaluation
+  */
+@Experimental
+object FastEvalEngineWorkflow  {
+  @transient lazy val logger = Logger[this.type]
+
+  type EX = Int
+  type AX = Int
+  type QX = Long
+
+  case class DataSourcePrefix(dataSourceParams: (String, Params)) {
+    def this(pp: PreparatorPrefix) = this(pp.dataSourceParams)
+    def this(ap: AlgorithmsPrefix) = this(ap.dataSourceParams)
+    def this(sp: ServingPrefix) = this(sp.dataSourceParams)
+  }
+
+  case class PreparatorPrefix(
+    dataSourceParams: (String, Params),
+    preparatorParams: (String, Params)) {
+    def this(ap: AlgorithmsPrefix) = {
+      this(ap.dataSourceParams, ap.preparatorParams)
+    }
+  }
+
+  case class AlgorithmsPrefix(
+    dataSourceParams: (String, Params),
+    preparatorParams: (String, Params),
+    algorithmParamsList: Seq[(String, Params)]) {
+    def this(sp: ServingPrefix) = {
+      this(sp.dataSourceParams, sp.preparatorParams, sp.algorithmParamsList)
+    }
+  }
+
+  case class ServingPrefix(
+    dataSourceParams: (String, Params),
+    preparatorParams: (String, Params),
+    algorithmParamsList: Seq[(String, Params)],
+    servingParams: (String, Params)) {
+    def this(ep: EngineParams) = this(
+      ep.dataSourceParams,
+      ep.preparatorParams,
+      ep.algorithmParamsList,
+      ep.servingParams)
+  }
+
+  def getDataSourceResult[TD, EI, PD, Q, P, A](
+    workflow: FastEvalEngineWorkflow[TD, EI, PD, Q, P, A],
+    prefix: DataSourcePrefix)
+  : Map[EX, (TD, EI, RDD[(QX, (Q, A))])] = {
+    val cache = workflow.dataSourceCache
+
+    if (!cache.contains(prefix)) {
+      val dataSource = Doer(
+        workflow.engine.dataSourceClassMap(prefix.dataSourceParams._1),
+        prefix.dataSourceParams._2)
+
+      val result = dataSource
+      .readEvalBase(workflow.sc)
+      .map { case (td, ei, qaRDD) => {
+        (td, ei, qaRDD.zipWithUniqueId().map(_.swap))
+      }}
+      .zipWithIndex
+      .map(_.swap)
+      .toMap
+
+      cache += Tuple2(prefix, result)
+    }
+    cache(prefix)
+  }
+
+  def getPreparatorResult[TD, EI, PD, Q, P, A](
+    workflow: FastEvalEngineWorkflow[TD, EI, PD, Q, P, A],
+    prefix: PreparatorPrefix): Map[EX, PD] = {
+    val cache = workflow.preparatorCache
+
+    if (!cache.contains(prefix)) {
+      val preparator = Doer(
+        workflow.engine.preparatorClassMap(prefix.preparatorParams._1),
+        prefix.preparatorParams._2)
+
+      val result = getDataSourceResult(
+        workflow = workflow,
+        prefix = new DataSourcePrefix(prefix))
+      .mapValues { case (td, _, _) => preparator.prepareBase(workflow.sc, td) }
+
+      cache += Tuple2(prefix, result)
+    }
+    cache(prefix)
+  }
+
+  def computeAlgorithmsResult[TD, EI, PD, Q, P, A](
+    workflow: FastEvalEngineWorkflow[TD, EI, PD, Q, P, A],
+    prefix: AlgorithmsPrefix): Map[EX, RDD[(QX, Seq[P])]] = {
+
+    val algoMap: Map[AX, BaseAlgorithm[PD, _, Q, P]] = prefix.algorithmParamsList
+      .map { case (algoName, algoParams) => {
+        try {
+          Doer(workflow.engine.algorithmClassMap(algoName), algoParams)
+        } catch {
+          case e: NoSuchElementException => {
+            val algorithmClassMap = workflow.engine.algorithmClassMap
+            if (algoName == "") {
+              logger.error("Empty algorithm name supplied but it could not " +
+                "match with any algorithm in the engine's definition. " +
+                "Existing algorithm name(s) are: " +
+                s"${algorithmClassMap.keys.mkString(", ")}. Aborting.")
+            } else {
+              logger.error(s"${algoName} cannot be found in the engine's " +
+                "definition. Existing algorithm name(s) are: " +
+                s"${algorithmClassMap.keys.mkString(", ")}. Aborting.")
+            }
+            sys.exit(1)
+          }
+        }
+      }}
+      .zipWithIndex
+      .map(_.swap)
+      .toMap
+
+    val algoCount = algoMap.size
+
+    // Model Train
+    val algoModelsMap: Map[EX, Map[AX, Any]] = getPreparatorResult(
+      workflow,
+      new PreparatorPrefix(prefix))
+    .mapValues {
+      pd => algoMap.mapValues(_.trainBase(workflow.sc,pd))
+    }
+
+    // Predict
+    val dataSourceResult =
+      FastEvalEngineWorkflow.getDataSourceResult(
+        workflow = workflow,
+        prefix = new DataSourcePrefix(prefix))
+
+    val algoResult: Map[EX, RDD[(QX, Seq[P])]] = dataSourceResult
+    .par
+    .map { case (ex, (td, ei, iqaRDD)) => {
+      val modelsMap: Map[AX, Any] = algoModelsMap(ex)
+      val qs: RDD[(QX, Q)] = iqaRDD.mapValues(_._1)
+
+      val algoPredicts: Seq[RDD[(QX, (AX, P))]] = (0 until algoCount)
+      .map { ax => {
+        val algo = algoMap(ax)
+        val model = modelsMap(ax)
+        val rawPredicts: RDD[(QX, P)] = algo.batchPredictBase(
+          workflow.sc,
+          model,
+          qs)
+
+        val predicts: RDD[(QX, (AX, P))] = rawPredicts.map {
+          case (qx, p) => (qx, (ax, p))
+        }
+        predicts
+      }}
+
+      val unionAlgoPredicts: RDD[(QX, Seq[P])] = workflow.sc
+      .union(algoPredicts)
+      .groupByKey
+      .mapValues { ps => {
+        assert (ps.size == algoCount, "Must have same length as algoCount")
+        // TODO. Check size == algoCount
+        ps.toSeq.sortBy(_._1).map(_._2)
+      }}
+      (ex, unionAlgoPredicts)
+    }}
+    .seq
+    .toMap
+
+    algoResult
+  }
+
+  def getAlgorithmsResult[TD, EI, PD, Q, P, A](
+    workflow: FastEvalEngineWorkflow[TD, EI, PD, Q, P, A],
+    prefix: AlgorithmsPrefix): Map[EX, RDD[(QX, Seq[P])]] = {
+    val cache = workflow.algorithmsCache
+    if (!cache.contains(prefix)) {
+      val result = computeAlgorithmsResult(workflow, prefix)
+      cache += Tuple2(prefix, result)
+    }
+    cache(prefix)
+  }
+
+  def getServingResult[TD, EI, PD, Q, P, A](
+    workflow: FastEvalEngineWorkflow[TD, EI, PD, Q, P, A],
+    prefix: ServingPrefix)
+  : Seq[(EI, RDD[(Q, P, A)])] = {
+    val cache = workflow.servingCache
+    if (!cache.contains(prefix)) {
+      val serving = Doer(
+        workflow.engine.servingClassMap(prefix.servingParams._1),
+        prefix.servingParams._2)
+
+      val algoPredictsMap = getAlgorithmsResult(
+        workflow = workflow,
+        prefix = new AlgorithmsPrefix(prefix))
+
+      val dataSourceResult = getDataSourceResult(
+        workflow = workflow,
+        prefix = new DataSourcePrefix(prefix))
+
+      val evalQAsMap = dataSourceResult.mapValues(_._3)
+      val evalInfoMap = dataSourceResult.mapValues(_._2)
+
+      val servingQPAMap: Map[EX, RDD[(Q, P, A)]] = algoPredictsMap
+      .map { case (ex, psMap) => {
+        val qasMap: RDD[(QX, (Q, A))] = evalQAsMap(ex)
+        val qpsaMap: RDD[(QX, Q, Seq[P], A)] = psMap.join(qasMap)
+        .map { case (qx, t) => (qx, t._2._1, t._1, t._2._2) }
+
+        val qpaMap: RDD[(Q, P, A)] = qpsaMap.map {
+          case (qx, q, ps, a) => (q, serving.serveBase(q, ps), a)
+        }
+        (ex, qpaMap)
+      }}
+
+      val servingResult = (0 until evalQAsMap.size).map { ex => {
+        (evalInfoMap(ex), servingQPAMap(ex))
+      }}
+      .toSeq
+
+      cache += Tuple2(prefix, servingResult)
+    }
+    cache(prefix)
+  }
+
+  def get[TD, EI, PD, Q, P, A](
+    workflow: FastEvalEngineWorkflow[TD, EI, PD, Q, P, A],
+    engineParamsList: Seq[EngineParams])
+  : Seq[(EngineParams, Seq[(EI, RDD[(Q, P, A)])])] = {
+    engineParamsList.map { engineParams => {
+      (engineParams,
+        getServingResult(workflow, new ServingPrefix(engineParams)))
+    }}
+  }
+}
+
+/** :: Experimental ::
+  * Workflow based on [[FastEvalEngine]]
+  *
+  * @group Evaluation
+  */
+@Experimental
+class FastEvalEngineWorkflow[TD, EI, PD, Q, P, A](
+  val engine: FastEvalEngine[TD, EI, PD, Q, P, A],
+  val sc: SparkContext,
+  val workflowParams: WorkflowParams) extends Serializable {
+
+  import org.apache.predictionio.controller.FastEvalEngineWorkflow._
+
+  type DataSourceResult = Map[EX, (TD, EI, RDD[(QX, (Q, A))])]
+  type PreparatorResult = Map[EX, PD]
+  type AlgorithmsResult = Map[EX, RDD[(QX, Seq[P])]]
+  type ServingResult = Seq[(EI, RDD[(Q, P, A)])]
+
+  val dataSourceCache = MutableHashMap[DataSourcePrefix, DataSourceResult]()
+  val preparatorCache = MutableHashMap[PreparatorPrefix, PreparatorResult]()
+  val algorithmsCache = MutableHashMap[AlgorithmsPrefix, AlgorithmsResult]()
+  val servingCache = MutableHashMap[ServingPrefix, ServingResult]()
+}
+
+
+
+/** :: Experimental ::
+  * FastEvalEngine is a subclass of [[Engine]] that exploits the immutability of
+  * controllers to optimize the evaluation process
+  *
+  * @group Evaluation
+  */
+@Experimental
+class FastEvalEngine[TD, EI, PD, Q, P, A](
+    dataSourceClassMap: Map[String, Class[_ <: BaseDataSource[TD, EI, Q, A]]],
+    preparatorClassMap: Map[String, Class[_ <: BasePreparator[TD, PD]]],
+    algorithmClassMap: Map[String, Class[_ <: BaseAlgorithm[PD, _, Q, P]]],
+    servingClassMap: Map[String, Class[_ <: BaseServing[Q, P]]])
+  extends Engine[TD, EI, PD, Q, P, A](
+    dataSourceClassMap,
+    preparatorClassMap,
+    algorithmClassMap,
+    servingClassMap) {
+  @transient override lazy val logger = Logger[this.type]
+
+  override def eval(
+    sc: SparkContext,
+    engineParams: EngineParams,
+    params: WorkflowParams): Seq[(EI, RDD[(Q, P, A)])] = {
+    logger.info("FastEvalEngine.eval")
+    batchEval(sc, Seq(engineParams), params).head._2
+  }
+
+  override def batchEval(
+    sc: SparkContext,
+    engineParamsList: Seq[EngineParams],
+    params: WorkflowParams)
+  : Seq[(EngineParams, Seq[(EI, RDD[(Q, P, A)])])] = {
+
+    val fastEngineWorkflow = new FastEvalEngineWorkflow(
+      this, sc, params)
+
+    FastEvalEngineWorkflow.get(
+      fastEngineWorkflow,
+      engineParamsList)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/IdentityPreparator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/controller/IdentityPreparator.scala b/core/src/main/scala/org/apache/predictionio/controller/IdentityPreparator.scala
new file mode 100644
index 0000000..c7669ba
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/controller/IdentityPreparator.scala
@@ -0,0 +1,92 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.controller
+
+import org.apache.predictionio.core.BaseDataSource
+import org.apache.predictionio.core.BasePreparator
+import org.apache.spark.SparkContext
+
+import scala.reflect._
+
+/** A helper concrete implementation of [[org.apache.predictionio.core.BasePreparator]]
+  * that passes training data through without any special preparation. This can
+  * be used in place for both [[PPreparator]] and [[LPreparator]].
+  *
+  * @tparam TD Training data class.
+  * @group Preparator
+  */
+class IdentityPreparator[TD] extends BasePreparator[TD, TD] {
+  def prepareBase(sc: SparkContext, td: TD): TD = td
+}
+
+/** Companion object of [[IdentityPreparator]] that conveniently returns an
+  * instance of the class of [[IdentityPreparator]] for use with
+  * [[EngineFactory]].
+  *
+  * @group Preparator
+  */
+object IdentityPreparator {
+  /** Produces an instance of the class of [[IdentityPreparator]].
+    *
+    * @param ds Instance of the class of the data source for this preparator.
+    */
+  def apply[TD](ds: Class[_ <: BaseDataSource[TD, _, _, _]]): Class[IdentityPreparator[TD]] =
+    classOf[IdentityPreparator[TD]]
+}
+
+/** DEPRECATED. Use [[IdentityPreparator]] instead.
+  *
+  * @tparam TD Training data class.
+  * @group Preparator
+  */
+@deprecated("Use IdentityPreparator instead.", "0.9.2")
+class PIdentityPreparator[TD] extends IdentityPreparator[TD]
+
+/** DEPRECATED. Use [[IdentityPreparator]] instead.
+  *
+  * @group Preparator
+  */
+@deprecated("Use IdentityPreparator instead.", "0.9.2")
+object PIdentityPreparator {
+  /** Produces an instance of the class of [[IdentityPreparator]].
+    *
+    * @param ds Instance of the class of the data source for this preparator.
+    */
+  def apply[TD](ds: Class[_ <: BaseDataSource[TD, _, _, _]]): Class[IdentityPreparator[TD]] =
+    classOf[IdentityPreparator[TD]]
+}
+
+/** DEPRECATED. Use [[IdentityPreparator]] instead.
+  *
+  * @tparam TD Training data class.
+  * @group Preparator
+  */
+@deprecated("Use IdentityPreparator instead.", "0.9.2")
+class LIdentityPreparator[TD] extends IdentityPreparator[TD]
+
+/** DEPRECATED. Use [[IdentityPreparator]] instead.
+  *
+  * @group Preparator
+  */
+@deprecated("Use IdentityPreparator instead.", "0.9.2")
+object LIdentityPreparator {
+  /** Produces an instance of the class of [[IdentityPreparator]].
+    *
+    * @param ds Instance of the class of the data source for this preparator.
+    */
+  def apply[TD](ds: Class[_ <: BaseDataSource[TD, _, _, _]]): Class[IdentityPreparator[TD]] =
+    classOf[IdentityPreparator[TD]]
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/LAlgorithm.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/controller/LAlgorithm.scala b/core/src/main/scala/org/apache/predictionio/controller/LAlgorithm.scala
new file mode 100644
index 0000000..664ebb7
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/controller/LAlgorithm.scala
@@ -0,0 +1,130 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.controller
+
+import _root_.org.apache.predictionio.annotation.DeveloperApi
+import org.apache.predictionio.core.BaseAlgorithm
+import org.apache.predictionio.workflow.PersistentModelManifest
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+
+import scala.reflect._
+
+/** Base class of a local algorithm.
+  *
+  * A local algorithm runs locally within a single machine and produces a model
+  * that can fit within a single machine.
+  *
+  * If your input query class requires custom JSON4S serialization, the most
+  * idiomatic way is to implement a trait that extends [[CustomQuerySerializer]],
+  * and mix that into your algorithm class, instead of overriding
+  * [[querySerializer]] directly.
+  *
+  * @tparam PD Prepared data class.
+  * @tparam M Trained model class.
+  * @tparam Q Input query class.
+  * @tparam P Output prediction class.
+  * @group Algorithm
+  */
+abstract class LAlgorithm[PD, M : ClassTag, Q, P]
+  extends BaseAlgorithm[RDD[PD], RDD[M], Q, P] {
+
+  def trainBase(sc: SparkContext, pd: RDD[PD]): RDD[M] = pd.map(train)
+
+  /** Implement this method to produce a model from prepared data.
+    *
+    * @param pd Prepared data for model training.
+    * @return Trained model.
+    */
+  def train(pd: PD): M
+
+  def batchPredictBase(sc: SparkContext, bm: Any, qs: RDD[(Long, Q)])
+  : RDD[(Long, P)] = {
+    val mRDD = bm.asInstanceOf[RDD[M]]
+    batchPredict(mRDD, qs)
+  }
+
+  /** This is a default implementation to perform batch prediction. Override
+    * this method for a custom implementation.
+    *
+    * @param mRDD A single model wrapped inside an RDD
+    * @param qs An RDD of index-query tuples. The index is used to keep track of
+    *           predicted results with corresponding queries.
+    * @return Batch of predicted results
+    */
+  def batchPredict(mRDD: RDD[M], qs: RDD[(Long, Q)]): RDD[(Long, P)] = {
+    val glomQs: RDD[Array[(Long, Q)]] = qs.glom()
+    val cartesian: RDD[(M, Array[(Long, Q)])] = mRDD.cartesian(glomQs)
+    cartesian.flatMap { case (m, qArray) =>
+      qArray.map { case (qx, q) => (qx, predict(m, q)) }
+    }
+  }
+
+  def predictBase(localBaseModel: Any, q: Q): P = {
+    predict(localBaseModel.asInstanceOf[M], q)
+  }
+
+  /** Implement this method to produce a prediction from a query and trained
+    * model.
+    *
+    * @param m Trained model produced by [[train]].
+    * @param q An input query.
+    * @return A prediction.
+    */
+  def predict(m: M, q: Q): P
+
+  /** :: DeveloperApi ::
+    * Engine developers should not use this directly (read on to see how local
+    * algorithm models are persisted).
+    *
+    * Local algorithms produce local models. By default, models will be
+    * serialized and stored automatically. Engine developers can override this behavior by
+    * mixing the [[PersistentModel]] trait into the model class, and
+    * PredictionIO will call [[PersistentModel.save]] instead. If it returns
+    * true, a [[org.apache.predictionio.workflow.PersistentModelManifest]] will be
+    * returned so that during deployment, PredictionIO will use
+    * [[PersistentModelLoader]] to retrieve the model. Otherwise, Unit will be
+    * returned and the model will be re-trained on-the-fly.
+    *
+    * @param sc Spark context
+    * @param modelId Model ID
+    * @param algoParams Algorithm parameters that trained this model
+    * @param bm Model
+    * @return The model itself for automatic persistence, an instance of
+    *         [[org.apache.predictionio.workflow.PersistentModelManifest]] for manual
+    *         persistence, or Unit for re-training on deployment
+    */
+  @DeveloperApi
+  override
+  def makePersistentModel(
+    sc: SparkContext,
+    modelId: String,
+    algoParams: Params,
+    bm: Any): Any = {
+    // Check RDD[M].count == 1
+    val m = bm.asInstanceOf[RDD[M]].first()
+    if (m.isInstanceOf[PersistentModel[_]]) {
+      if (m.asInstanceOf[PersistentModel[Params]].save(
+        modelId, algoParams, sc)) {
+        PersistentModelManifest(className = m.getClass.getName)
+      } else {
+        Unit
+      }
+    } else {
+      m
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/LAverageServing.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/controller/LAverageServing.scala b/core/src/main/scala/org/apache/predictionio/controller/LAverageServing.scala
new file mode 100644
index 0000000..7fbe7ac
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/controller/LAverageServing.scala
@@ -0,0 +1,41 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.controller
+
+import org.apache.predictionio.core.BaseAlgorithm
+
+/** A concrete implementation of [[LServing]] returning the average of all
+  * algorithms' predictions, where their classes are expected to be all Double.
+  *
+  * @group Serving
+  */
+class LAverageServing[Q] extends LServing[Q, Double] {
+  /** Returns the average of all algorithms' predictions. */
+  def serve(query: Q, predictions: Seq[Double]): Double = {
+    predictions.sum / predictions.length
+  }
+}
+
+/** A concrete implementation of [[LServing]] returning the average of all
+  * algorithms' predictions, where their classes are expected to be all Double.
+  *
+  * @group Serving
+  */
+object LAverageServing {
+  /** Returns an instance of [[LAverageServing]]. */
+  def apply[Q](a: Class[_ <: BaseAlgorithm[_, _, Q, _]]): Class[LAverageServing[Q]] =
+    classOf[LAverageServing[Q]]
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/LDataSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/controller/LDataSource.scala b/core/src/main/scala/org/apache/predictionio/controller/LDataSource.scala
new file mode 100644
index 0000000..adb8e20
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/controller/LDataSource.scala
@@ -0,0 +1,67 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.controller
+
+import org.apache.predictionio.core.BaseDataSource
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+
+import scala.reflect._
+
+/** Base class of a local data source.
+  *
+  * A local data source runs locally within a single machine and return data
+  * that can fit within a single machine.
+  *
+  * @tparam TD Training data class.
+  * @tparam EI Evaluation Info class.
+  * @tparam Q Input query class.
+  * @tparam A Actual value class.
+  * @group Data Source
+  */
+abstract class LDataSource[TD: ClassTag, EI, Q, A]
+  extends BaseDataSource[RDD[TD], EI, Q, A] {
+
+  def readTrainingBase(sc: SparkContext): RDD[TD] = {
+    sc.parallelize(Seq(None)).map(_ => readTraining())
+  }
+
+  /** Implement this method to only return training data from a data source */
+  def readTraining(): TD
+
+  def readEvalBase(sc: SparkContext): Seq[(RDD[TD], EI, RDD[(Q, A)])] = {
+    val localEvalData: Seq[(TD, EI, Seq[(Q, A)])] = readEval()
+
+    localEvalData.map { case (td, ei, qaSeq) => {
+      val tdRDD = sc.parallelize(Seq(None)).map(_ => td)
+      val qaRDD = sc.parallelize(qaSeq)
+      (tdRDD, ei, qaRDD)
+    }}
+  }
+
+  /** To provide evaluation feature for your engine, your must override this
+    * method to return data for evaluation from a data source. Returned data can
+    * optionally include a sequence of query and actual value pairs for
+    * evaluation purpose.
+    *
+    * The default implementation returns an empty sequence as a stub, so that
+    * an engine can be compiled without implementing evaluation.
+    */
+  def readEval(): Seq[(TD, EI, Seq[(Q, A)])] = Seq[(TD, EI, Seq[(Q, A)])]()
+
+  @deprecated("Use readEval() instead.", "0.9.0")
+  def read(): Seq[(TD, EI, Seq[(Q, A)])] = readEval()
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/LFirstServing.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/controller/LFirstServing.scala b/core/src/main/scala/org/apache/predictionio/controller/LFirstServing.scala
new file mode 100644
index 0000000..e677743
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/controller/LFirstServing.scala
@@ -0,0 +1,39 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.controller
+
+import org.apache.predictionio.core.BaseAlgorithm
+
+/** A concrete implementation of [[LServing]] returning the first algorithm's
+  * prediction result directly without any modification.
+  *
+  * @group Serving
+  */
+class LFirstServing[Q, P] extends LServing[Q, P] {
+  /** Returns the first algorithm's prediction. */
+  def serve(query: Q, predictions: Seq[P]): P = predictions.head
+}
+
+/** A concrete implementation of [[LServing]] returning the first algorithm's
+  * prediction result directly without any modification.
+  *
+  * @group Serving
+  */
+object LFirstServing {
+  /** Returns an instance of [[LFirstServing]]. */
+  def apply[Q, P](a: Class[_ <: BaseAlgorithm[_, _, Q, P]]): Class[LFirstServing[Q, P]] =
+    classOf[LFirstServing[Q, P]]
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/LPreparator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/controller/LPreparator.scala b/core/src/main/scala/org/apache/predictionio/controller/LPreparator.scala
new file mode 100644
index 0000000..32ffd5d
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/controller/LPreparator.scala
@@ -0,0 +1,46 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.controller
+
+import org.apache.predictionio.core.BasePreparator
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+
+import scala.reflect._
+
+/** Base class of a local preparator.
+  *
+  * A local preparator runs locally within a single machine and produces
+  * prepared data that can fit within a single machine.
+  *
+  * @tparam TD Training data class.
+  * @tparam PD Prepared data class.
+  * @group Preparator
+  */
+abstract class LPreparator[TD, PD : ClassTag]
+  extends BasePreparator[RDD[TD], RDD[PD]] {
+
+  def prepareBase(sc: SparkContext, rddTd: RDD[TD]): RDD[PD] = {
+    rddTd.map(prepare)
+  }
+
+  /** Implement this method to produce prepared data that is ready for model
+    * training.
+    *
+    * @param trainingData Training data to be prepared.
+    */
+  def prepare(trainingData: TD): PD
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/LServing.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/controller/LServing.scala b/core/src/main/scala/org/apache/predictionio/controller/LServing.scala
new file mode 100644
index 0000000..653b998
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/controller/LServing.scala
@@ -0,0 +1,52 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.controller
+
+import org.apache.predictionio.annotation.Experimental
+import org.apache.predictionio.core.BaseServing
+
+/** Base class of serving.
+  *
+  * @tparam Q Input query class.
+  * @tparam P Output prediction class.
+  * @group Serving
+  */
+abstract class LServing[Q, P] extends BaseServing[Q, P] {
+  def supplementBase(q: Q): Q = supplement(q)
+
+  /** :: Experimental ::
+    * Implement this method to supplement the query before sending it to
+    * algorithms.
+    *
+    * @param q Query
+    * @return A supplemented Query
+    */
+  @Experimental
+  def supplement(q: Q): Q = q
+
+  def serveBase(q: Q, ps: Seq[P]): P = {
+    serve(q, ps)
+  }
+
+  /** Implement this method to combine multiple algorithms' predictions to
+    * produce a single final prediction. The query is the original query sent to
+    * the engine, not the supplemented produced by [[LServing.supplement]].
+    *
+    * @param query Original input query.
+    * @param predictions A list of algorithms' predictions.
+    */
+  def serve(query: Q, predictions: Seq[P]): P
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/LocalFileSystemPersistentModel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/controller/LocalFileSystemPersistentModel.scala b/core/src/main/scala/org/apache/predictionio/controller/LocalFileSystemPersistentModel.scala
new file mode 100644
index 0000000..f90e28d
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/controller/LocalFileSystemPersistentModel.scala
@@ -0,0 +1,74 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.controller
+
+import org.apache.spark.SparkContext
+
+/** This trait is a convenience helper for persisting your model to the local
+  * filesystem. This trait and [[LocalFileSystemPersistentModelLoader]] contain
+  * concrete implementation and need not be implemented.
+  *
+  * The underlying implementation is [[Utils.save]].
+  *
+  * {{{
+  * class MyModel extends LocalFileSystemPersistentModel[MyParams] {
+  *   ...
+  * }
+  *
+  * object MyModel extends LocalFileSystemPersistentModelLoader[MyParams, MyModel] {
+  *   ...
+  * }
+  * }}}
+  *
+  * @tparam AP Algorithm parameters class.
+  * @see [[LocalFileSystemPersistentModelLoader]]
+  * @group Algorithm
+  */
+trait LocalFileSystemPersistentModel[AP <: Params] extends PersistentModel[AP] {
+  def save(id: String, params: AP, sc: SparkContext): Boolean = {
+    Utils.save(id, this)
+    true
+  }
+}
+
+/** Implement an object that extends this trait for PredictionIO to support
+  * loading a persisted model from local filesystem during serving deployment.
+  *
+  * The underlying implementation is [[Utils.load]].
+  *
+  * @tparam AP Algorithm parameters class.
+  * @tparam M Model class.
+  * @see [[LocalFileSystemPersistentModel]]
+  * @group Algorithm
+  */
+trait LocalFileSystemPersistentModelLoader[AP <: Params, M]
+  extends PersistentModelLoader[AP, M] {
+  def apply(id: String, params: AP, sc: Option[SparkContext]): M = {
+    Utils.load(id).asInstanceOf[M]
+  }
+}
+
+/** DEPRECATED. Use [[LocalFileSystemPersistentModel]] instead.
+  *
+  * @group Algorithm */
+@deprecated("Use LocalFileSystemPersistentModel instead.", "0.9.2")
+trait IFSPersistentModel[AP <: Params] extends LocalFileSystemPersistentModel[AP]
+
+/** DEPRECATED. Use [[LocalFileSystemPersistentModelLoader]] instead.
+  *
+  * @group Algorithm */
+@deprecated("Use LocalFileSystemPersistentModelLoader instead.", "0.9.2")
+trait IFSPersistentModelLoader[AP <: Params, M] extends LocalFileSystemPersistentModelLoader[AP, M]