You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2016/07/18 20:17:53 UTC
[22/34] incubator-predictionio git commit: rename all except examples
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/Engine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/controller/Engine.scala b/core/src/main/scala/org/apache/predictionio/controller/Engine.scala
new file mode 100644
index 0000000..c875a9f
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/controller/Engine.scala
@@ -0,0 +1,829 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.controller
+
+import grizzled.slf4j.Logger
+import org.apache.predictionio.core.BaseAlgorithm
+import org.apache.predictionio.core.BaseDataSource
+import org.apache.predictionio.core.BaseEngine
+import org.apache.predictionio.core.BasePreparator
+import org.apache.predictionio.core.BaseServing
+import org.apache.predictionio.core.Doer
+import org.apache.predictionio.data.storage.EngineInstance
+import org.apache.predictionio.data.storage.StorageClientException
+import org.apache.predictionio.workflow.CreateWorkflow
+import org.apache.predictionio.workflow.EngineLanguage
+import org.apache.predictionio.workflow.JsonExtractorOption.JsonExtractorOption
+import org.apache.predictionio.workflow.NameParamsSerializer
+import org.apache.predictionio.workflow.PersistentModelManifest
+import org.apache.predictionio.workflow.SparkWorkflowUtils
+import org.apache.predictionio.workflow.StopAfterPrepareInterruption
+import org.apache.predictionio.workflow.StopAfterReadInterruption
+import org.apache.predictionio.workflow.WorkflowParams
+import org.apache.predictionio.workflow.WorkflowUtils
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+import org.json4s._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.read
+
+import scala.collection.JavaConversions
+import scala.language.implicitConversions
+
+/** This class chains up the entire data process. PredictionIO uses this
+ * information to create workflows and deployments. In Scala, you should
+ * implement an object that extends the [[EngineFactory]] trait similar to the
+ * following example.
+ *
+ * {{{
+ * object ItemRankEngine extends EngineFactory {
+ * def apply() = {
+ * new Engine(
+ * classOf[ItemRankDataSource],
+ * classOf[ItemRankPreparator],
+ * Map(
+ * "knn" -> classOf[KNNAlgorithm],
+ * "rand" -> classOf[RandomAlgorithm],
+ * "mahoutItemBased" -> classOf[MahoutItemBasedAlgorithm]),
+ * classOf[ItemRankServing])
+ * }
+ * }
+ * }}}
+ *
+ * @see [[EngineFactory]]
+ * @tparam TD Training data class.
+ * @tparam EI Evaluation info class.
+ * @tparam PD Prepared data class.
+ * @tparam Q Input query class.
+ * @tparam P Output prediction class.
+ * @tparam A Actual value class.
+ * @param dataSourceClassMap Map of data source names to class.
+ * @param preparatorClassMap Map of preparator names to class.
+ * @param algorithmClassMap Map of algorithm names to classes.
+ * @param servingClassMap Map of serving names to class.
+ * @group Engine
+ */
+class Engine[TD, EI, PD, Q, P, A](
+ val dataSourceClassMap: Map[String,
+ Class[_ <: BaseDataSource[TD, EI, Q, A]]],
+ val preparatorClassMap: Map[String, Class[_ <: BasePreparator[TD, PD]]],
+ val algorithmClassMap: Map[String, Class[_ <: BaseAlgorithm[PD, _, Q, P]]],
+ val servingClassMap: Map[String, Class[_ <: BaseServing[Q, P]]])
+ extends BaseEngine[EI, Q, P, A] {
+
+ private[prediction]
+ implicit lazy val formats = Utils.json4sDefaultFormats +
+ new NameParamsSerializer
+
+ @transient lazy protected val logger = Logger[this.type]
+
+ /** This auxiliary constructor is provided for backward compatibility.
+ *
+ * @param dataSourceClass Data source class.
+ * @param preparatorClass Preparator class.
+ * @param algorithmClassMap Map of algorithm names to classes.
+ * @param servingClass Serving class.
+ */
+ def this(
+ dataSourceClass: Class[_ <: BaseDataSource[TD, EI, Q, A]],
+ preparatorClass: Class[_ <: BasePreparator[TD, PD]],
+ algorithmClassMap: Map[String, Class[_ <: BaseAlgorithm[PD, _, Q, P]]],
+ servingClass: Class[_ <: BaseServing[Q, P]]) = this(
+ Map("" -> dataSourceClass),
+ Map("" -> preparatorClass),
+ algorithmClassMap,
+ Map("" -> servingClass)
+ )
+
+ /** Java-friendly constructor
+ *
+ * @param dataSourceClass Data source class.
+ * @param preparatorClass Preparator class.
+ * @param algorithmClassMap Map of algorithm names to classes.
+ * @param servingClass Serving class.
+ */
+ def this(dataSourceClass: Class[_ <: BaseDataSource[TD, EI, Q, A]],
+ preparatorClass: Class[_ <: BasePreparator[TD, PD]],
+ algorithmClassMap: _root_.java.util.Map[String, Class[_ <: BaseAlgorithm[PD, _, Q, P]]],
+ servingClass: Class[_ <: BaseServing[Q, P]]) = this(
+ Map("" -> dataSourceClass),
+ Map("" -> preparatorClass),
+ JavaConversions.mapAsScalaMap(algorithmClassMap).toMap,
+ Map("" -> servingClass)
+ )
+
+ /** Returns a new Engine instance, mimicking case class's copy method behavior.
+ */
+ def copy(
+ dataSourceClassMap: Map[String, Class[_ <: BaseDataSource[TD, EI, Q, A]]]
+ = dataSourceClassMap,
+ preparatorClassMap: Map[String, Class[_ <: BasePreparator[TD, PD]]]
+ = preparatorClassMap,
+ algorithmClassMap: Map[String, Class[_ <: BaseAlgorithm[PD, _, Q, P]]]
+ = algorithmClassMap,
+ servingClassMap: Map[String, Class[_ <: BaseServing[Q, P]]]
+ = servingClassMap): Engine[TD, EI, PD, Q, P, A] = {
+ new Engine(
+ dataSourceClassMap,
+ preparatorClassMap,
+ algorithmClassMap,
+ servingClassMap)
+ }
+
+ /** Training this engine would return a list of models.
+ *
+ * @param sc An instance of SparkContext.
+ * @param engineParams An instance of [[EngineParams]] for running a single training.
+ * @param params An instance of [[WorkflowParams]] that controls the workflow.
+ * @return A list of models.
+ */
+ def train(
+ sc: SparkContext,
+ engineParams: EngineParams,
+ engineInstanceId: String,
+ params: WorkflowParams): Seq[Any] = {
+ val (dataSourceName, dataSourceParams) = engineParams.dataSourceParams
+ val dataSource = Doer(dataSourceClassMap(dataSourceName), dataSourceParams)
+
+ val (preparatorName, preparatorParams) = engineParams.preparatorParams
+ val preparator = Doer(preparatorClassMap(preparatorName), preparatorParams)
+
+ val algoParamsList = engineParams.algorithmParamsList
+ require(
+ algoParamsList.size > 0,
+ "EngineParams.algorithmParamsList must have at least 1 element.")
+
+ val algorithms = algoParamsList.map { case (algoName, algoParams) =>
+ Doer(algorithmClassMap(algoName), algoParams)
+ }
+
+ val models = Engine.train(
+ sc, dataSource, preparator, algorithms, params)
+
+ val algoCount = algorithms.size
+ val algoTuples: Seq[(String, Params, BaseAlgorithm[_, _, _, _], Any)] =
+ (0 until algoCount).map { ax => {
+ // val (name, params) = algoParamsList(ax)
+ val (name, params) = algoParamsList(ax)
+ (name, params, algorithms(ax), models(ax))
+ }}
+
+ makeSerializableModels(
+ sc,
+ engineInstanceId = engineInstanceId,
+ algoTuples = algoTuples)
+ }
+
+ /** Algorithm models can be persisted before deploy. However, it is also
+ * possible that models are not persisted. This method retrains non-persisted
+ * models and return a list of models that can be used directly in deploy.
+ */
+ private[prediction]
+ def prepareDeploy(
+ sc: SparkContext,
+ engineParams: EngineParams,
+ engineInstanceId: String,
+ persistedModels: Seq[Any],
+ params: WorkflowParams): Seq[Any] = {
+
+ val algoParamsList = engineParams.algorithmParamsList
+ val algorithms = algoParamsList.map { case (algoName, algoParams) =>
+ Doer(algorithmClassMap(algoName), algoParams)
+ }
+
+ val models = if (persistedModels.exists(m => m.isInstanceOf[Unit.type])) {
+ // If any of persistedModels is Unit, we need to re-train the model.
+ logger.info("Some persisted models are Unit, need to re-train.")
+ val (dataSourceName, dataSourceParams) = engineParams.dataSourceParams
+ val dataSource = Doer(dataSourceClassMap(dataSourceName), dataSourceParams)
+
+ val (preparatorName, preparatorParams) = engineParams.preparatorParams
+ val preparator = Doer(preparatorClassMap(preparatorName), preparatorParams)
+
+ val td = dataSource.readTrainingBase(sc)
+ val pd = preparator.prepareBase(sc, td)
+
+ val models = algorithms.zip(persistedModels).map { case (algo, m) =>
+ m match {
+ case Unit => algo.trainBase(sc, pd)
+ case _ => m
+ }
+ }
+ models
+ } else {
+ logger.info("Using persisted model")
+ persistedModels
+ }
+
+ models
+ .zip(algorithms)
+ .zip(algoParamsList)
+ .zipWithIndex
+ .map {
+ case (((model, algo), (algoName, algoParams)), ax) => {
+ model match {
+ case modelManifest: PersistentModelManifest => {
+ logger.info("Custom-persisted model detected for algorithm " +
+ algo.getClass.getName)
+ SparkWorkflowUtils.getPersistentModel(
+ modelManifest,
+ Seq(engineInstanceId, ax, algoName).mkString("-"),
+ algoParams,
+ Some(sc),
+ getClass.getClassLoader)
+ }
+ case m => {
+ try {
+ logger.info(
+ s"Loaded model ${m.getClass.getName} for algorithm " +
+ s"${algo.getClass.getName}")
+ sc.stop
+ m
+ } catch {
+ case e: NullPointerException =>
+ logger.warn(
+ s"Null model detected for algorithm ${algo.getClass.getName}")
+ m
+ }
+ }
+ } // model match
+ }
+ }
+ }
+
+ /** Extract model for persistent layer.
+ *
+ * PredictionIO presist models for future use. It allows custom
+ * implementation for persisting models. You need to implement the
+ * [[org.apache.predictionio.controller.PersistentModel]] interface. This method
+ * traverses all models in the workflow. If the model is a
+ * [[org.apache.predictionio.controller.PersistentModel]], it calls the save method
+ * for custom persistence logic.
+ *
+ * For model doesn't support custom logic, PredictionIO serializes the whole
+ * model if the corresponding algorithm is local. On the other hand, if the
+ * model is parallel (i.e. model associated with a number of huge RDDS), this
+ * method return Unit, in which case PredictionIO will retrain the whole
+ * model from scratch next time it is used.
+ */
+ private def makeSerializableModels(
+ sc: SparkContext,
+ engineInstanceId: String,
+ // AlgoName, Algo, Model
+ algoTuples: Seq[(String, Params, BaseAlgorithm[_, _, _, _], Any)]
+ ): Seq[Any] = {
+
+ logger.info(s"engineInstanceId=$engineInstanceId")
+
+ algoTuples
+ .zipWithIndex
+ .map { case ((name, params, algo, model), ax) =>
+ algo.makePersistentModel(
+ sc = sc,
+ modelId = Seq(engineInstanceId, ax, name).mkString("-"),
+ algoParams = params,
+ bm = model)
+ }
+ }
+
+ /** This is implemented such that [[org.apache.predictionio.controller.Evaluation]] can
+ * use this method to generate inputs for [[org.apache.predictionio.controller.Metric]].
+ *
+ * @param sc An instance of SparkContext.
+ * @param engineParams An instance of [[EngineParams]] for running a single evaluation.
+ * @param params An instance of [[WorkflowParams]] that controls the workflow.
+ * @return A list of evaluation information and RDD of query, predicted
+ * result, and actual result tuple tuple.
+ */
+ def eval(
+ sc: SparkContext,
+ engineParams: EngineParams,
+ params: WorkflowParams)
+ : Seq[(EI, RDD[(Q, P, A)])] = {
+ val (dataSourceName, dataSourceParams) = engineParams.dataSourceParams
+ val dataSource = Doer(dataSourceClassMap(dataSourceName), dataSourceParams)
+
+ val (preparatorName, preparatorParams) = engineParams.preparatorParams
+ val preparator = Doer(preparatorClassMap(preparatorName), preparatorParams)
+
+ val algoParamsList = engineParams.algorithmParamsList
+ require(
+ algoParamsList.size > 0,
+ "EngineParams.algorithmParamsList must have at least 1 element.")
+
+ val algorithms = algoParamsList.map { case (algoName, algoParams) => {
+ try {
+ Doer(algorithmClassMap(algoName), algoParams)
+ } catch {
+ case e: NoSuchElementException => {
+ if (algoName == "") {
+ logger.error("Empty algorithm name supplied but it could not " +
+ "match with any algorithm in the engine's definition. " +
+ "Existing algorithm name(s) are: " +
+ s"${algorithmClassMap.keys.mkString(", ")}. Aborting.")
+ } else {
+ logger.error(s"$algoName cannot be found in the engine's " +
+ "definition. Existing algorithm name(s) are: " +
+ s"${algorithmClassMap.keys.mkString(", ")}. Aborting.")
+ }
+ sys.exit(1)
+ }
+ }
+ }}
+
+ val (servingName, servingParams) = engineParams.servingParams
+ val serving = Doer(servingClassMap(servingName), servingParams)
+
+ Engine.eval(sc, dataSource, preparator, algorithms, serving)
+ }
+
+ override def jValueToEngineParams(
+ variantJson: JValue,
+ jsonExtractor: JsonExtractorOption): EngineParams = {
+
+ val engineLanguage = EngineLanguage.Scala
+ // Extract EngineParams
+ logger.info(s"Extracting datasource params...")
+ val dataSourceParams: (String, Params) =
+ WorkflowUtils.getParamsFromJsonByFieldAndClass(
+ variantJson,
+ "datasource",
+ dataSourceClassMap,
+ engineLanguage,
+ jsonExtractor)
+ logger.info(s"Datasource params: $dataSourceParams")
+
+ logger.info(s"Extracting preparator params...")
+ val preparatorParams: (String, Params) =
+ WorkflowUtils.getParamsFromJsonByFieldAndClass(
+ variantJson,
+ "preparator",
+ preparatorClassMap,
+ engineLanguage,
+ jsonExtractor)
+ logger.info(s"Preparator params: $preparatorParams")
+
+ val algorithmsParams: Seq[(String, Params)] =
+ variantJson findField {
+ case JField("algorithms", _) => true
+ case _ => false
+ } map { jv =>
+ val algorithmsParamsJson = jv._2
+ algorithmsParamsJson match {
+ case JArray(s) => s.map { algorithmParamsJValue =>
+ val eap = algorithmParamsJValue.extract[CreateWorkflow.AlgorithmParams]
+ (
+ eap.name,
+ WorkflowUtils.extractParams(
+ engineLanguage,
+ compact(render(eap.params)),
+ algorithmClassMap(eap.name),
+ jsonExtractor)
+ )
+ }
+ case _ => Nil
+ }
+ } getOrElse Seq(("", EmptyParams()))
+
+ logger.info(s"Extracting serving params...")
+ val servingParams: (String, Params) =
+ WorkflowUtils.getParamsFromJsonByFieldAndClass(
+ variantJson,
+ "serving",
+ servingClassMap,
+ engineLanguage,
+ jsonExtractor)
+ logger.info(s"Serving params: $servingParams")
+
+ new EngineParams(
+ dataSourceParams = dataSourceParams,
+ preparatorParams = preparatorParams,
+ algorithmParamsList = algorithmsParams,
+ servingParams = servingParams)
+ }
+
+ private[prediction] def engineInstanceToEngineParams(
+ engineInstance: EngineInstance,
+ jsonExtractor: JsonExtractorOption): EngineParams = {
+
+ implicit val formats = DefaultFormats
+ val engineLanguage = EngineLanguage.Scala
+
+ val dataSourceParamsWithName: (String, Params) = {
+ val (name, params) =
+ read[(String, JValue)](engineInstance.dataSourceParams)
+ if (!dataSourceClassMap.contains(name)) {
+ logger.error(s"Unable to find datasource class with name '$name'" +
+ " defined in Engine.")
+ sys.exit(1)
+ }
+ val extractedParams = WorkflowUtils.extractParams(
+ engineLanguage,
+ compact(render(params)),
+ dataSourceClassMap(name),
+ jsonExtractor)
+ (name, extractedParams)
+ }
+
+ val preparatorParamsWithName: (String, Params) = {
+ val (name, params) =
+ read[(String, JValue)](engineInstance.preparatorParams)
+ if (!preparatorClassMap.contains(name)) {
+ logger.error(s"Unable to find preparator class with name '$name'" +
+ " defined in Engine.")
+ sys.exit(1)
+ }
+ val extractedParams = WorkflowUtils.extractParams(
+ engineLanguage,
+ compact(render(params)),
+ preparatorClassMap(name),
+ jsonExtractor)
+ (name, extractedParams)
+ }
+
+ val algorithmsParamsWithNames =
+ read[Seq[(String, JValue)]](engineInstance.algorithmsParams).map {
+ case (algoName, params) =>
+ val extractedParams = WorkflowUtils.extractParams(
+ engineLanguage,
+ compact(render(params)),
+ algorithmClassMap(algoName),
+ jsonExtractor)
+ (algoName, extractedParams)
+ }
+
+ val servingParamsWithName: (String, Params) = {
+ val (name, params) = read[(String, JValue)](engineInstance.servingParams)
+ if (!servingClassMap.contains(name)) {
+ logger.error(s"Unable to find serving class with name '$name'" +
+ " defined in Engine.")
+ sys.exit(1)
+ }
+ val extractedParams = WorkflowUtils.extractParams(
+ engineLanguage,
+ compact(render(params)),
+ servingClassMap(name),
+ jsonExtractor)
+ (name, extractedParams)
+ }
+
+ new EngineParams(
+ dataSourceParams = dataSourceParamsWithName,
+ preparatorParams = preparatorParamsWithName,
+ algorithmParamsList = algorithmsParamsWithNames,
+ servingParams = servingParamsWithName)
+ }
+}
+
+/** This object contains concrete implementation for some methods of the
+ * [[Engine]] class.
+ *
+ * @group Engine
+ */
+object Engine {
+ private type EX = Int
+ private type AX = Int
+ private type QX = Long
+
+ @transient lazy private val logger = Logger[this.type]
+
+ /** Helper class to accept either a single data source, or a map of data
+ * sources, with a companion object providing implicit conversions, so
+ * using this class directly is not necessary.
+ *
+ * @tparam TD Training data class
+ * @tparam EI Evaluation information class
+ * @tparam Q Input query class
+ * @tparam A Actual result class
+ */
+ class DataSourceMap[TD, EI, Q, A](
+ val m: Map[String, Class[_ <: BaseDataSource[TD, EI, Q, A]]]) {
+ def this(c: Class[_ <: BaseDataSource[TD, EI, Q, A]]) = this(Map("" -> c))
+ }
+
+ /** Companion object providing implicit conversions, so using this directly
+ * is not necessary.
+ */
+ object DataSourceMap {
+ implicit def cToMap[TD, EI, Q, A](
+ c: Class[_ <: BaseDataSource[TD, EI, Q, A]]):
+ DataSourceMap[TD, EI, Q, A] = new DataSourceMap(c)
+ implicit def mToMap[TD, EI, Q, A](
+ m: Map[String, Class[_ <: BaseDataSource[TD, EI, Q, A]]]):
+ DataSourceMap[TD, EI, Q, A] = new DataSourceMap(m)
+ }
+
+ /** Helper class to accept either a single preparator, or a map of
+ * preparators, with a companion object providing implicit conversions, so
+ * using this class directly is not necessary.
+ *
+ * @tparam TD Training data class
+ * @tparam PD Prepared data class
+ */
+ class PreparatorMap[TD, PD](
+ val m: Map[String, Class[_ <: BasePreparator[TD, PD]]]) {
+ def this(c: Class[_ <: BasePreparator[TD, PD]]) = this(Map("" -> c))
+ }
+
+ /** Companion object providing implicit conversions, so using this directly
+ * is not necessary.
+ */
+ object PreparatorMap {
+ implicit def cToMap[TD, PD](
+ c: Class[_ <: BasePreparator[TD, PD]]):
+ PreparatorMap[TD, PD] = new PreparatorMap(c)
+ implicit def mToMap[TD, PD](
+ m: Map[String, Class[_ <: BasePreparator[TD, PD]]]):
+ PreparatorMap[TD, PD] = new PreparatorMap(m)
+ }
+
+ /** Helper class to accept either a single serving, or a map of serving, with
+ * a companion object providing implicit conversions, so using this class
+ * directly is not necessary.
+ *
+ * @tparam Q Input query class
+ * @tparam P Predicted result class
+ */
+ class ServingMap[Q, P](
+ val m: Map[String, Class[_ <: BaseServing[Q, P]]]) {
+ def this(c: Class[_ <: BaseServing[Q, P]]) = this(Map("" -> c))
+ }
+
+ /** Companion object providing implicit conversions, so using this directly
+ * is not necessary.
+ */
+ object ServingMap {
+ implicit def cToMap[Q, P](
+ c: Class[_ <: BaseServing[Q, P]]): ServingMap[Q, P] =
+ new ServingMap(c)
+ implicit def mToMap[Q, P](
+ m: Map[String, Class[_ <: BaseServing[Q, P]]]): ServingMap[Q, P] =
+ new ServingMap(m)
+ }
+
+ /** Convenient method for returning an instance of [[Engine]].
+ *
+ * @param dataSourceMap Accepts either an instance of Class of the data
+ * source, or a Map of data source classes (implicitly
+ * converted to [[DataSourceMap]].
+ * @param preparatorMap Accepts either an instance of Class of the
+ * preparator, or a Map of preparator classes
+ * (implicitly converted to [[PreparatorMap]].
+ * @param algorithmClassMap Accepts a Map of algorithm classes.
+ * @param servingMap Accepts either an instance of Class of the serving, or
+ * a Map of serving classes (implicitly converted to
+ * [[ServingMap]].
+ * @tparam TD Training data class
+ * @tparam EI Evaluation information class
+ * @tparam PD Prepared data class
+ * @tparam Q Input query class
+ * @tparam P Predicted result class
+ * @tparam A Actual result class
+ * @return An instance of [[Engine]]
+ */
+ def apply[TD, EI, PD, Q, P, A](
+ dataSourceMap: DataSourceMap[TD, EI, Q, A],
+ preparatorMap: PreparatorMap[TD, PD],
+ algorithmClassMap: Map[String, Class[_ <: BaseAlgorithm[PD, _, Q, P]]],
+ servingMap: ServingMap[Q, P]): Engine[TD, EI, PD, Q, P, A] = new Engine(
+ dataSourceMap.m,
+ preparatorMap.m,
+ algorithmClassMap,
+ servingMap.m
+ )
+
+ /** Provides concrete implementation of training for [[Engine]].
+ *
+ * @param sc An instance of SparkContext
+ * @param dataSource An instance of data source
+ * @param preparator An instance of preparator
+ * @param algorithmList A list of algorithm instances
+ * @param params An instance of [[WorkflowParams]] that controls the training
+ * process.
+ * @tparam TD Training data class
+ * @tparam PD Prepared data class
+ * @tparam Q Input query class
+ * @return A list of trained models
+ */
+ def train[TD, PD, Q](
+ sc: SparkContext,
+ dataSource: BaseDataSource[TD, _, Q, _],
+ preparator: BasePreparator[TD, PD],
+ algorithmList: Seq[BaseAlgorithm[PD, _, Q, _]],
+ params: WorkflowParams
+ ): Seq[Any] = {
+ logger.info("EngineWorkflow.train")
+ logger.info(s"DataSource: $dataSource")
+ logger.info(s"Preparator: $preparator")
+ logger.info(s"AlgorithmList: $algorithmList")
+
+ if (params.skipSanityCheck) {
+ logger.info("Data sanity check is off.")
+ } else {
+ logger.info("Data sanity check is on.")
+ }
+
+ val td = try {
+ dataSource.readTrainingBase(sc)
+ } catch {
+ case e: StorageClientException =>
+ logger.error(s"Error occured reading from data source. (Reason: " +
+ e.getMessage + ") Please see the log for debugging details.", e)
+ sys.exit(1)
+ }
+
+ if (!params.skipSanityCheck) {
+ td match {
+ case sanityCheckable: SanityCheck => {
+ logger.info(s"${td.getClass.getName} supports data sanity" +
+ " check. Performing check.")
+ sanityCheckable.sanityCheck()
+ }
+ case _ => {
+ logger.info(s"${td.getClass.getName} does not support" +
+ " data sanity check. Skipping check.")
+ }
+ }
+ }
+
+ if (params.stopAfterRead) {
+ logger.info("Stopping here because --stop-after-read is set.")
+ throw StopAfterReadInterruption()
+ }
+
+ val pd = preparator.prepareBase(sc, td)
+
+ if (!params.skipSanityCheck) {
+ pd match {
+ case sanityCheckable: SanityCheck => {
+ logger.info(s"${pd.getClass.getName} supports data sanity" +
+ " check. Performing check.")
+ sanityCheckable.sanityCheck()
+ }
+ case _ => {
+ logger.info(s"${pd.getClass.getName} does not support" +
+ " data sanity check. Skipping check.")
+ }
+ }
+ }
+
+ if (params.stopAfterPrepare) {
+ logger.info("Stopping here because --stop-after-prepare is set.")
+ throw StopAfterPrepareInterruption()
+ }
+
+ val models: Seq[Any] = algorithmList.map(_.trainBase(sc, pd))
+
+ if (!params.skipSanityCheck) {
+ models.foreach { model => {
+ model match {
+ case sanityCheckable: SanityCheck => {
+ logger.info(s"${model.getClass.getName} supports data sanity" +
+ " check. Performing check.")
+ sanityCheckable.sanityCheck()
+ }
+ case _ => {
+ logger.info(s"${model.getClass.getName} does not support" +
+ " data sanity check. Skipping check.")
+ }
+ }
+ }}
+ }
+
+ logger.info("EngineWorkflow.train completed")
+ models
+ }
+
+ /** Provides concrete implementation of evaluation for [[Engine]].
+ *
+ * @param sc An instance of SparkContext
+ * @param dataSource An instance of data source
+ * @param preparator An instance of preparator
+ * @param algorithmList A list of algorithm instances
+ * @param serving An instance of serving
+ * @tparam TD Training data class
+ * @tparam PD Prepared data class
+ * @tparam Q Input query class
+ * @tparam P Predicted result class
+ * @tparam A Actual result class
+ * @tparam EI Evaluation information class
+ * @return A list of evaluation information, RDD of query, predicted result,
+ * and actual result tuple tuple.
+ */
+ def eval[TD, PD, Q, P, A, EI](
+ sc: SparkContext,
+ dataSource: BaseDataSource[TD, EI, Q, A],
+ preparator: BasePreparator[TD, PD],
+ algorithmList: Seq[BaseAlgorithm[PD, _, Q, P]],
+ serving: BaseServing[Q, P]): Seq[(EI, RDD[(Q, P, A)])] = {
+ logger.info(s"DataSource: $dataSource")
+ logger.info(s"Preparator: $preparator")
+ logger.info(s"AlgorithmList: $algorithmList")
+ logger.info(s"Serving: $serving")
+
+ val algoMap: Map[AX, BaseAlgorithm[PD, _, Q, P]] = algorithmList
+ .zipWithIndex
+ .map(_.swap)
+ .toMap
+ val algoCount = algoMap.size
+
+ val evalTupleMap: Map[EX, (TD, EI, RDD[(Q, A)])] = dataSource
+ .readEvalBase(sc)
+ .zipWithIndex
+ .map(_.swap)
+ .toMap
+
+ val evalCount = evalTupleMap.size
+
+ val evalTrainMap: Map[EX, TD] = evalTupleMap.mapValues(_._1)
+ val evalInfoMap: Map[EX, EI] = evalTupleMap.mapValues(_._2)
+ val evalQAsMap: Map[EX, RDD[(QX, (Q, A))]] = evalTupleMap
+ .mapValues(_._3)
+ .mapValues{ _.zipWithUniqueId().map(_.swap) }
+
+ val preparedMap: Map[EX, PD] = evalTrainMap.mapValues { td => {
+ preparator.prepareBase(sc, td)
+ }}
+
+ val algoModelsMap: Map[EX, Map[AX, Any]] = preparedMap.mapValues { pd => {
+ algoMap.mapValues(_.trainBase(sc,pd))
+ }}
+
+ val suppQAsMap: Map[EX, RDD[(QX, (Q, A))]] = evalQAsMap.mapValues { qas =>
+ qas.map { case (qx, (q, a)) => (qx, (serving.supplementBase(q), a)) }
+ }
+
+ val algoPredictsMap: Map[EX, RDD[(QX, Seq[P])]] = (0 until evalCount)
+ .map { ex => {
+ val modelMap: Map[AX, Any] = algoModelsMap(ex)
+
+ val qs: RDD[(QX, Q)] = suppQAsMap(ex).mapValues(_._1)
+
+ val algoPredicts: Seq[RDD[(QX, (AX, P))]] = (0 until algoCount)
+ .map { ax => {
+ val algo = algoMap(ax)
+ val model = modelMap(ax)
+ val rawPredicts: RDD[(QX, P)] = algo.batchPredictBase(sc, model, qs)
+ val predicts: RDD[(QX, (AX, P))] = rawPredicts.map { case (qx, p) => {
+ (qx, (ax, p))
+ }}
+ predicts
+ }}
+
+ val unionAlgoPredicts: RDD[(QX, Seq[P])] = sc.union(algoPredicts)
+ .groupByKey()
+ .mapValues { ps => {
+ assert (ps.size == algoCount, "Must have same length as algoCount")
+ // TODO. Check size == algoCount
+ ps.toSeq.sortBy(_._1).map(_._2)
+ }}
+
+ (ex, unionAlgoPredicts)
+ }}
+ .toMap
+
+ val servingQPAMap: Map[EX, RDD[(Q, P, A)]] = algoPredictsMap
+ .map { case (ex, psMap) => {
+ // The query passed to serving.serve is the original one, not
+ // supplemented.
+ val qasMap: RDD[(QX, (Q, A))] = evalQAsMap(ex)
+ val qpsaMap: RDD[(QX, Q, Seq[P], A)] = psMap.join(qasMap)
+ .map { case (qx, t) => (qx, t._2._1, t._1, t._2._2) }
+
+ val qpaMap: RDD[(Q, P, A)] = qpsaMap.map {
+ case (qx, q, ps, a) => (q, serving.serveBase(q, ps), a)
+ }
+ (ex, qpaMap)
+ }}
+
+ (0 until evalCount).map { ex => {
+ (evalInfoMap(ex), servingQPAMap(ex))
+ }}
+ .toSeq
+ }
+}
+
+/** Mix in this trait for queries that contain prId (PredictedResultId).
+ * This is useful when your engine expects queries to also be associated with
+ * prId keys when feedback loop is enabled.
+ *
+ * @group Helper
+ */
+@deprecated("To be removed in future releases.", "0.9.2")
+trait WithPrId {
+ val prId: String = ""
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/EngineFactory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/controller/EngineFactory.scala b/core/src/main/scala/org/apache/predictionio/controller/EngineFactory.scala
new file mode 100644
index 0000000..e9db35b
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/controller/EngineFactory.scala
@@ -0,0 +1,41 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.controller
+
+import org.apache.predictionio.core.BaseEngine
+
+import scala.language.implicitConversions
+
+/** If you intend to let PredictionIO create workflow and deploy serving
+ * automatically, you will need to implement an object that extends this class
+ * and return an [[Engine]].
+ *
+ * @group Engine
+ */
+abstract class EngineFactory {
+ /** Creates an instance of an [[Engine]]. */
+ def apply(): BaseEngine[_, _, _, _]
+
+ /** Override this method to programmatically return engine parameters. */
+ def engineParams(key: String): EngineParams = EngineParams()
+}
+
+/** DEPRECATED. Use [[EngineFactory]] instead.
+ *
+ * @group Engine
+ */
+@deprecated("Use EngineFactory instead.", "0.9.2")
+trait IEngineFactory extends EngineFactory
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/EngineParams.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/controller/EngineParams.scala b/core/src/main/scala/org/apache/predictionio/controller/EngineParams.scala
new file mode 100644
index 0000000..b419255
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/controller/EngineParams.scala
@@ -0,0 +1,149 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.controller
+
+import org.apache.predictionio.core.BaseDataSource
+import org.apache.predictionio.core.BaseAlgorithm
+
+import scala.collection.JavaConversions
+import scala.language.implicitConversions
+
+/** This class serves as a logical grouping of all required engine's parameters.
+ *
+ * @param dataSourceParams Data Source name-parameters tuple.
+ * @param preparatorParams Preparator name-parameters tuple.
+ * @param algorithmParamsList List of algorithm name-parameter pairs.
+ * @param servingParams Serving name-parameters tuple.
+ * @group Engine
+ */
+class EngineParams(
+ val dataSourceParams: (String, Params) = ("", EmptyParams()),
+ val preparatorParams: (String, Params) = ("", EmptyParams()),
+ val algorithmParamsList: Seq[(String, Params)] = Seq(),
+ val servingParams: (String, Params) = ("", EmptyParams()))
+ extends Serializable {
+
+ /** Java-friendly constructor
+ *
+ * @param dataSourceName Data Source name
+ * @param dataSourceParams Data Source parameters
+ * @param preparatorName Preparator name
+ * @param preparatorParams Preparator parameters
+ * @param algorithmParamsList Map of algorithm name-parameters
+ * @param servingName Serving name
+ * @param servingParams Serving parameters
+ */
+ def this(
+ dataSourceName: String,
+ dataSourceParams: Params,
+ preparatorName: String,
+ preparatorParams: Params,
+ algorithmParamsList: _root_.java.util.Map[String, _ <: Params],
+ servingName: String,
+ servingParams: Params) = {
+
+ // To work around a json4s weird limitation, the parameter names can not be changed
+ this(
+ (dataSourceName, dataSourceParams),
+ (preparatorName, preparatorParams),
+ JavaConversions.mapAsScalaMap(algorithmParamsList).toSeq,
+ (servingName, servingParams)
+ )
+ }
+
+ // A case class style copy method.
+ def copy(
+ dataSourceParams: (String, Params) = dataSourceParams,
+ preparatorParams: (String, Params) = preparatorParams,
+ algorithmParamsList: Seq[(String, Params)] = algorithmParamsList,
+ servingParams: (String, Params) = servingParams): EngineParams = {
+
+ new EngineParams(
+ dataSourceParams,
+ preparatorParams,
+ algorithmParamsList,
+ servingParams)
+ }
+}
+
+/** Companion object for creating [[EngineParams]] instances.
+ *
+ * @group Engine
+ */
+object EngineParams {
+ /** Create EngineParams.
+ *
+ * @param dataSourceName Data Source name
+ * @param dataSourceParams Data Source parameters
+ * @param preparatorName Preparator name
+ * @param preparatorParams Preparator parameters
+ * @param algorithmParamsList List of algorithm name-parameter pairs.
+ * @param servingName Serving name
+ * @param servingParams Serving parameters
+ */
+ def apply(
+ dataSourceName: String = "",
+ dataSourceParams: Params = EmptyParams(),
+ preparatorName: String = "",
+ preparatorParams: Params = EmptyParams(),
+ algorithmParamsList: Seq[(String, Params)] = Seq(),
+ servingName: String = "",
+ servingParams: Params = EmptyParams()): EngineParams = {
+ new EngineParams(
+ dataSourceParams = (dataSourceName, dataSourceParams),
+ preparatorParams = (preparatorName, preparatorParams),
+ algorithmParamsList = algorithmParamsList,
+ servingParams = (servingName, servingParams)
+ )
+ }
+}
+
+/** SimpleEngine has only one algorithm, and uses default preparator and serving
+ * layer. Current default preparator is `IdentityPreparator` and serving is
+ * `FirstServing`.
+ *
+ * @tparam TD Training data class.
+ * @tparam EI Evaluation info class.
+ * @tparam Q Input query class.
+ * @tparam P Output prediction class.
+ * @tparam A Actual value class.
+ * @param dataSourceClass Data source class.
+ * @param algorithmClass of algorithm names to classes.
+ * @group Engine
+ */
+class SimpleEngine[TD, EI, Q, P, A](
+ dataSourceClass: Class[_ <: BaseDataSource[TD, EI, Q, A]],
+ algorithmClass: Class[_ <: BaseAlgorithm[TD, _, Q, P]])
+ extends Engine(
+ dataSourceClass,
+ IdentityPreparator(dataSourceClass),
+ Map("" -> algorithmClass),
+ LFirstServing(algorithmClass))
+
+/** This shorthand class serves the `SimpleEngine` class.
+ *
+ * @param dataSourceParams Data source parameters.
+ * @param algorithmParams List of algorithm name-parameter pairs.
+ * @group Engine
+ */
+class SimpleEngineParams(
+ dataSourceParams: Params = EmptyParams(),
+ algorithmParams: Params = EmptyParams())
+ extends EngineParams(
+ dataSourceParams = ("", dataSourceParams),
+ algorithmParamsList = Seq(("", algorithmParams)))
+
+
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/EngineParamsGenerator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/controller/EngineParamsGenerator.scala b/core/src/main/scala/org/apache/predictionio/controller/EngineParamsGenerator.scala
new file mode 100644
index 0000000..2e26b83
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/controller/EngineParamsGenerator.scala
@@ -0,0 +1,43 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.controller
+
+import scala.language.implicitConversions
+
+/** Defines an engine parameters generator.
+ *
+ * Implementations of this trait can be supplied to "pio eval" as the second
+ * command line argument.
+ *
+ * @group Evaluation
+ */
+trait EngineParamsGenerator {
+ protected[this] var epList: Seq[EngineParams] = _
+ protected[this] var epListSet: Boolean = false
+
+ /** Returns the list of [[EngineParams]] of this [[EngineParamsGenerator]]. */
+ def engineParamsList: Seq[EngineParams] = {
+ assert(epListSet, "EngineParamsList not set")
+ epList
+ }
+
+ /** Sets the list of [[EngineParams]] of this [[EngineParamsGenerator]]. */
+ def engineParamsList_=(l: Seq[EngineParams]) {
+ assert(!epListSet, "EngineParamsList can bet set at most once")
+ epList = Seq(l:_*)
+ epListSet = true
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/Evaluation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/controller/Evaluation.scala b/core/src/main/scala/org/apache/predictionio/controller/Evaluation.scala
new file mode 100644
index 0000000..c720c4f
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/controller/Evaluation.scala
@@ -0,0 +1,122 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.controller
+
+import org.apache.predictionio.core.BaseEngine
+import org.apache.predictionio.core.BaseEvaluator
+import org.apache.predictionio.core.BaseEvaluatorResult
+
+import scala.language.implicitConversions
+
+/** Defines an evaluation that contains an engine and a metric.
+ *
+ * Implementations of this trait can be supplied to "pio eval" as the first
+ * argument.
+ *
+ * @group Evaluation
+ */
+trait Evaluation extends Deployment {
+ protected [this] var _evaluatorSet: Boolean = false
+ protected [this] var _evaluator: BaseEvaluator[_, _, _, _, _ <: BaseEvaluatorResult] = _
+
+ private [prediction]
+ def evaluator: BaseEvaluator[_, _, _, _, _ <: BaseEvaluatorResult] = {
+ assert(_evaluatorSet, "Evaluator not set")
+ _evaluator
+ }
+
+ /** Gets the tuple of the [[Engine]] and the implementation of
+ * [[org.apache.predictionio.core.BaseEvaluator]]
+ */
+ def engineEvaluator
+ : (BaseEngine[_, _, _, _], BaseEvaluator[_, _, _, _, _]) = {
+ assert(_evaluatorSet, "Evaluator not set")
+ (engine, _evaluator)
+ }
+
+ /** Sets both an [[Engine]] and an implementation of
+ * [[org.apache.predictionio.core.BaseEvaluator]] for this [[Evaluation]]
+ *
+ * @param engineEvaluator A tuple an [[Engine]] and an implementation of
+ * [[org.apache.predictionio.core.BaseEvaluator]]
+ * @tparam EI Evaluation information class
+ * @tparam Q Query class
+ * @tparam P Predicted result class
+ * @tparam A Actual result class
+ * @tparam R Metric result class
+ */
+ def engineEvaluator_=[EI, Q, P, A, R <: BaseEvaluatorResult](
+ engineEvaluator: (
+ BaseEngine[EI, Q, P, A],
+ BaseEvaluator[EI, Q, P, A, R])) {
+ assert(!_evaluatorSet, "Evaluator can be set at most once")
+ engine = engineEvaluator._1
+ _evaluator = engineEvaluator._2
+ _evaluatorSet = true
+ }
+
+ /** Returns both the [[Engine]] and the implementation of [[Metric]] for this
+ * [[Evaluation]]
+ */
+ def engineMetric: (BaseEngine[_, _, _, _], Metric[_, _, _, _, _]) = {
+ throw new NotImplementedError("This method is to keep the compiler happy")
+ }
+
+ /** Sets both an [[Engine]] and an implementation of [[Metric]] for this
+ * [[Evaluation]]
+ *
+ * @param engineMetric A tuple of [[Engine]] and an implementation of
+ * [[Metric]]
+ * @tparam EI Evaluation information class
+ * @tparam Q Query class
+ * @tparam P Predicted result class
+ * @tparam A Actual result class
+ */
+ def engineMetric_=[EI, Q, P, A](
+ engineMetric: (BaseEngine[EI, Q, P, A], Metric[EI, Q, P, A, _])) {
+ engineEvaluator = (
+ engineMetric._1,
+ MetricEvaluator(
+ metric = engineMetric._2,
+ otherMetrics = Seq[Metric[EI, Q, P, A, _]](),
+ outputPath = "best.json"))
+ }
+
+ private [prediction]
+ def engineMetrics: (BaseEngine[_, _, _, _], Metric[_, _, _, _, _]) = {
+ throw new NotImplementedError("This method is to keep the compiler happy")
+ }
+
+ /** Sets an [[Engine]], an implementation of [[Metric]], and sequence of
+ * implementations of [[Metric]] for this [[Evaluation]]
+ *
+ * @param engineMetrics A tuple of [[Engine]], an implementation of
+ * [[Metric]] and sequence of implementations of [[Metric]]
+ * @tparam EI Evaluation information class
+ * @tparam Q Query class
+ * @tparam P Predicted result class
+ * @tparam A Actual result class
+ */
+ def engineMetrics_=[EI, Q, P, A](
+ engineMetrics: (
+ BaseEngine[EI, Q, P, A],
+ Metric[EI, Q, P, A, _],
+ Seq[Metric[EI, Q, P, A, _]])) {
+ engineEvaluator = (
+ engineMetrics._1,
+ MetricEvaluator(engineMetrics._2, engineMetrics._3))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/FastEvalEngine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/controller/FastEvalEngine.scala b/core/src/main/scala/org/apache/predictionio/controller/FastEvalEngine.scala
new file mode 100644
index 0000000..868d818
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/controller/FastEvalEngine.scala
@@ -0,0 +1,343 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.controller
+
+import org.apache.predictionio.core.BaseDataSource
+import org.apache.predictionio.core.BasePreparator
+import org.apache.predictionio.core.BaseAlgorithm
+import org.apache.predictionio.core.BaseServing
+import org.apache.predictionio.core.Doer
+import org.apache.predictionio.annotation.Experimental
+
+import grizzled.slf4j.Logger
+import org.apache.predictionio.workflow.WorkflowParams
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+import scala.language.implicitConversions
+
+import _root_.java.util.NoSuchElementException
+
+import scala.collection.mutable.{ HashMap => MutableHashMap }
+
+/** :: Experimental ::
+ * Workflow based on [[FastEvalEngine]]
+ *
+ * @group Evaluation
+ */
+@Experimental
+object FastEvalEngineWorkflow {
+ @transient lazy val logger = Logger[this.type]
+
+ type EX = Int
+ type AX = Int
+ type QX = Long
+
+ case class DataSourcePrefix(dataSourceParams: (String, Params)) {
+ def this(pp: PreparatorPrefix) = this(pp.dataSourceParams)
+ def this(ap: AlgorithmsPrefix) = this(ap.dataSourceParams)
+ def this(sp: ServingPrefix) = this(sp.dataSourceParams)
+ }
+
+ case class PreparatorPrefix(
+ dataSourceParams: (String, Params),
+ preparatorParams: (String, Params)) {
+ def this(ap: AlgorithmsPrefix) = {
+ this(ap.dataSourceParams, ap.preparatorParams)
+ }
+ }
+
+ case class AlgorithmsPrefix(
+ dataSourceParams: (String, Params),
+ preparatorParams: (String, Params),
+ algorithmParamsList: Seq[(String, Params)]) {
+ def this(sp: ServingPrefix) = {
+ this(sp.dataSourceParams, sp.preparatorParams, sp.algorithmParamsList)
+ }
+ }
+
+ case class ServingPrefix(
+ dataSourceParams: (String, Params),
+ preparatorParams: (String, Params),
+ algorithmParamsList: Seq[(String, Params)],
+ servingParams: (String, Params)) {
+ def this(ep: EngineParams) = this(
+ ep.dataSourceParams,
+ ep.preparatorParams,
+ ep.algorithmParamsList,
+ ep.servingParams)
+ }
+
+ def getDataSourceResult[TD, EI, PD, Q, P, A](
+ workflow: FastEvalEngineWorkflow[TD, EI, PD, Q, P, A],
+ prefix: DataSourcePrefix)
+ : Map[EX, (TD, EI, RDD[(QX, (Q, A))])] = {
+ val cache = workflow.dataSourceCache
+
+ if (!cache.contains(prefix)) {
+ val dataSource = Doer(
+ workflow.engine.dataSourceClassMap(prefix.dataSourceParams._1),
+ prefix.dataSourceParams._2)
+
+ val result = dataSource
+ .readEvalBase(workflow.sc)
+ .map { case (td, ei, qaRDD) => {
+ (td, ei, qaRDD.zipWithUniqueId().map(_.swap))
+ }}
+ .zipWithIndex
+ .map(_.swap)
+ .toMap
+
+ cache += Tuple2(prefix, result)
+ }
+ cache(prefix)
+ }
+
+ def getPreparatorResult[TD, EI, PD, Q, P, A](
+ workflow: FastEvalEngineWorkflow[TD, EI, PD, Q, P, A],
+ prefix: PreparatorPrefix): Map[EX, PD] = {
+ val cache = workflow.preparatorCache
+
+ if (!cache.contains(prefix)) {
+ val preparator = Doer(
+ workflow.engine.preparatorClassMap(prefix.preparatorParams._1),
+ prefix.preparatorParams._2)
+
+ val result = getDataSourceResult(
+ workflow = workflow,
+ prefix = new DataSourcePrefix(prefix))
+ .mapValues { case (td, _, _) => preparator.prepareBase(workflow.sc, td) }
+
+ cache += Tuple2(prefix, result)
+ }
+ cache(prefix)
+ }
+
+ def computeAlgorithmsResult[TD, EI, PD, Q, P, A](
+ workflow: FastEvalEngineWorkflow[TD, EI, PD, Q, P, A],
+ prefix: AlgorithmsPrefix): Map[EX, RDD[(QX, Seq[P])]] = {
+
+ val algoMap: Map[AX, BaseAlgorithm[PD, _, Q, P]] = prefix.algorithmParamsList
+ .map { case (algoName, algoParams) => {
+ try {
+ Doer(workflow.engine.algorithmClassMap(algoName), algoParams)
+ } catch {
+ case e: NoSuchElementException => {
+ val algorithmClassMap = workflow.engine.algorithmClassMap
+ if (algoName == "") {
+ logger.error("Empty algorithm name supplied but it could not " +
+ "match with any algorithm in the engine's definition. " +
+ "Existing algorithm name(s) are: " +
+ s"${algorithmClassMap.keys.mkString(", ")}. Aborting.")
+ } else {
+ logger.error(s"${algoName} cannot be found in the engine's " +
+ "definition. Existing algorithm name(s) are: " +
+ s"${algorithmClassMap.keys.mkString(", ")}. Aborting.")
+ }
+ sys.exit(1)
+ }
+ }
+ }}
+ .zipWithIndex
+ .map(_.swap)
+ .toMap
+
+ val algoCount = algoMap.size
+
+ // Model Train
+ val algoModelsMap: Map[EX, Map[AX, Any]] = getPreparatorResult(
+ workflow,
+ new PreparatorPrefix(prefix))
+ .mapValues {
+ pd => algoMap.mapValues(_.trainBase(workflow.sc,pd))
+ }
+
+ // Predict
+ val dataSourceResult =
+ FastEvalEngineWorkflow.getDataSourceResult(
+ workflow = workflow,
+ prefix = new DataSourcePrefix(prefix))
+
+ val algoResult: Map[EX, RDD[(QX, Seq[P])]] = dataSourceResult
+ .par
+ .map { case (ex, (td, ei, iqaRDD)) => {
+ val modelsMap: Map[AX, Any] = algoModelsMap(ex)
+ val qs: RDD[(QX, Q)] = iqaRDD.mapValues(_._1)
+
+ val algoPredicts: Seq[RDD[(QX, (AX, P))]] = (0 until algoCount)
+ .map { ax => {
+ val algo = algoMap(ax)
+ val model = modelsMap(ax)
+ val rawPredicts: RDD[(QX, P)] = algo.batchPredictBase(
+ workflow.sc,
+ model,
+ qs)
+
+ val predicts: RDD[(QX, (AX, P))] = rawPredicts.map {
+ case (qx, p) => (qx, (ax, p))
+ }
+ predicts
+ }}
+
+ val unionAlgoPredicts: RDD[(QX, Seq[P])] = workflow.sc
+ .union(algoPredicts)
+ .groupByKey
+ .mapValues { ps => {
+ assert (ps.size == algoCount, "Must have same length as algoCount")
+ // TODO. Check size == algoCount
+ ps.toSeq.sortBy(_._1).map(_._2)
+ }}
+ (ex, unionAlgoPredicts)
+ }}
+ .seq
+ .toMap
+
+ algoResult
+ }
+
+ def getAlgorithmsResult[TD, EI, PD, Q, P, A](
+ workflow: FastEvalEngineWorkflow[TD, EI, PD, Q, P, A],
+ prefix: AlgorithmsPrefix): Map[EX, RDD[(QX, Seq[P])]] = {
+ val cache = workflow.algorithmsCache
+ if (!cache.contains(prefix)) {
+ val result = computeAlgorithmsResult(workflow, prefix)
+ cache += Tuple2(prefix, result)
+ }
+ cache(prefix)
+ }
+
+ def getServingResult[TD, EI, PD, Q, P, A](
+ workflow: FastEvalEngineWorkflow[TD, EI, PD, Q, P, A],
+ prefix: ServingPrefix)
+ : Seq[(EI, RDD[(Q, P, A)])] = {
+ val cache = workflow.servingCache
+ if (!cache.contains(prefix)) {
+ val serving = Doer(
+ workflow.engine.servingClassMap(prefix.servingParams._1),
+ prefix.servingParams._2)
+
+ val algoPredictsMap = getAlgorithmsResult(
+ workflow = workflow,
+ prefix = new AlgorithmsPrefix(prefix))
+
+ val dataSourceResult = getDataSourceResult(
+ workflow = workflow,
+ prefix = new DataSourcePrefix(prefix))
+
+ val evalQAsMap = dataSourceResult.mapValues(_._3)
+ val evalInfoMap = dataSourceResult.mapValues(_._2)
+
+ val servingQPAMap: Map[EX, RDD[(Q, P, A)]] = algoPredictsMap
+ .map { case (ex, psMap) => {
+ val qasMap: RDD[(QX, (Q, A))] = evalQAsMap(ex)
+ val qpsaMap: RDD[(QX, Q, Seq[P], A)] = psMap.join(qasMap)
+ .map { case (qx, t) => (qx, t._2._1, t._1, t._2._2) }
+
+ val qpaMap: RDD[(Q, P, A)] = qpsaMap.map {
+ case (qx, q, ps, a) => (q, serving.serveBase(q, ps), a)
+ }
+ (ex, qpaMap)
+ }}
+
+ val servingResult = (0 until evalQAsMap.size).map { ex => {
+ (evalInfoMap(ex), servingQPAMap(ex))
+ }}
+ .toSeq
+
+ cache += Tuple2(prefix, servingResult)
+ }
+ cache(prefix)
+ }
+
+ def get[TD, EI, PD, Q, P, A](
+ workflow: FastEvalEngineWorkflow[TD, EI, PD, Q, P, A],
+ engineParamsList: Seq[EngineParams])
+ : Seq[(EngineParams, Seq[(EI, RDD[(Q, P, A)])])] = {
+ engineParamsList.map { engineParams => {
+ (engineParams,
+ getServingResult(workflow, new ServingPrefix(engineParams)))
+ }}
+ }
+}
+
+/** :: Experimental ::
+ * Workflow based on [[FastEvalEngine]]
+ *
+ * @group Evaluation
+ */
+@Experimental
+class FastEvalEngineWorkflow[TD, EI, PD, Q, P, A](
+ val engine: FastEvalEngine[TD, EI, PD, Q, P, A],
+ val sc: SparkContext,
+ val workflowParams: WorkflowParams) extends Serializable {
+
+ import org.apache.predictionio.controller.FastEvalEngineWorkflow._
+
+ type DataSourceResult = Map[EX, (TD, EI, RDD[(QX, (Q, A))])]
+ type PreparatorResult = Map[EX, PD]
+ type AlgorithmsResult = Map[EX, RDD[(QX, Seq[P])]]
+ type ServingResult = Seq[(EI, RDD[(Q, P, A)])]
+
+ val dataSourceCache = MutableHashMap[DataSourcePrefix, DataSourceResult]()
+ val preparatorCache = MutableHashMap[PreparatorPrefix, PreparatorResult]()
+ val algorithmsCache = MutableHashMap[AlgorithmsPrefix, AlgorithmsResult]()
+ val servingCache = MutableHashMap[ServingPrefix, ServingResult]()
+}
+
+
+
+/** :: Experimental ::
+ * FastEvalEngine is a subclass of [[Engine]] that exploits the immutability of
+ * controllers to optimize the evaluation process
+ *
+ * @group Evaluation
+ */
+@Experimental
+class FastEvalEngine[TD, EI, PD, Q, P, A](
+ dataSourceClassMap: Map[String, Class[_ <: BaseDataSource[TD, EI, Q, A]]],
+ preparatorClassMap: Map[String, Class[_ <: BasePreparator[TD, PD]]],
+ algorithmClassMap: Map[String, Class[_ <: BaseAlgorithm[PD, _, Q, P]]],
+ servingClassMap: Map[String, Class[_ <: BaseServing[Q, P]]])
+ extends Engine[TD, EI, PD, Q, P, A](
+ dataSourceClassMap,
+ preparatorClassMap,
+ algorithmClassMap,
+ servingClassMap) {
+ @transient override lazy val logger = Logger[this.type]
+
+ override def eval(
+ sc: SparkContext,
+ engineParams: EngineParams,
+ params: WorkflowParams): Seq[(EI, RDD[(Q, P, A)])] = {
+ logger.info("FastEvalEngine.eval")
+ batchEval(sc, Seq(engineParams), params).head._2
+ }
+
+ override def batchEval(
+ sc: SparkContext,
+ engineParamsList: Seq[EngineParams],
+ params: WorkflowParams)
+ : Seq[(EngineParams, Seq[(EI, RDD[(Q, P, A)])])] = {
+
+ val fastEngineWorkflow = new FastEvalEngineWorkflow(
+ this, sc, params)
+
+ FastEvalEngineWorkflow.get(
+ fastEngineWorkflow,
+ engineParamsList)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/IdentityPreparator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/controller/IdentityPreparator.scala b/core/src/main/scala/org/apache/predictionio/controller/IdentityPreparator.scala
new file mode 100644
index 0000000..c7669ba
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/controller/IdentityPreparator.scala
@@ -0,0 +1,92 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.controller
+
+import org.apache.predictionio.core.BaseDataSource
+import org.apache.predictionio.core.BasePreparator
+import org.apache.spark.SparkContext
+
+import scala.reflect._
+
+/** A helper concrete implementation of [[org.apache.predictionio.core.BasePreparator]]
+ * that passes training data through without any special preparation. This can
+ * be used in place for both [[PPreparator]] and [[LPreparator]].
+ *
+ * @tparam TD Training data class.
+ * @group Preparator
+ */
+class IdentityPreparator[TD] extends BasePreparator[TD, TD] {
+ def prepareBase(sc: SparkContext, td: TD): TD = td
+}
+
+/** Companion object of [[IdentityPreparator]] that conveniently returns an
+ * instance of the class of [[IdentityPreparator]] for use with
+ * [[EngineFactory]].
+ *
+ * @group Preparator
+ */
+object IdentityPreparator {
+ /** Produces an instance of the class of [[IdentityPreparator]].
+ *
+ * @param ds Instance of the class of the data source for this preparator.
+ */
+ def apply[TD](ds: Class[_ <: BaseDataSource[TD, _, _, _]]): Class[IdentityPreparator[TD]] =
+ classOf[IdentityPreparator[TD]]
+}
+
+/** DEPRECATED. Use [[IdentityPreparator]] instead.
+ *
+ * @tparam TD Training data class.
+ * @group Preparator
+ */
+@deprecated("Use IdentityPreparator instead.", "0.9.2")
+class PIdentityPreparator[TD] extends IdentityPreparator[TD]
+
+/** DEPRECATED. Use [[IdentityPreparator]] instead.
+ *
+ * @group Preparator
+ */
+@deprecated("Use IdentityPreparator instead.", "0.9.2")
+object PIdentityPreparator {
+ /** Produces an instance of the class of [[IdentityPreparator]].
+ *
+ * @param ds Instance of the class of the data source for this preparator.
+ */
+ def apply[TD](ds: Class[_ <: BaseDataSource[TD, _, _, _]]): Class[IdentityPreparator[TD]] =
+ classOf[IdentityPreparator[TD]]
+}
+
+/** DEPRECATED. Use [[IdentityPreparator]] instead.
+ *
+ * @tparam TD Training data class.
+ * @group Preparator
+ */
+@deprecated("Use IdentityPreparator instead.", "0.9.2")
+class LIdentityPreparator[TD] extends IdentityPreparator[TD]
+
+/** DEPRECATED. Use [[IdentityPreparator]] instead.
+ *
+ * @group Preparator
+ */
+@deprecated("Use IdentityPreparator instead.", "0.9.2")
+object LIdentityPreparator {
+ /** Produces an instance of the class of [[IdentityPreparator]].
+ *
+ * @param ds Instance of the class of the data source for this preparator.
+ */
+ def apply[TD](ds: Class[_ <: BaseDataSource[TD, _, _, _]]): Class[IdentityPreparator[TD]] =
+ classOf[IdentityPreparator[TD]]
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/LAlgorithm.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/controller/LAlgorithm.scala b/core/src/main/scala/org/apache/predictionio/controller/LAlgorithm.scala
new file mode 100644
index 0000000..664ebb7
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/controller/LAlgorithm.scala
@@ -0,0 +1,130 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.controller
+
+import _root_.org.apache.predictionio.annotation.DeveloperApi
+import org.apache.predictionio.core.BaseAlgorithm
+import org.apache.predictionio.workflow.PersistentModelManifest
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+
+import scala.reflect._
+
+/** Base class of a local algorithm.
+ *
+ * A local algorithm runs locally within a single machine and produces a model
+ * that can fit within a single machine.
+ *
+ * If your input query class requires custom JSON4S serialization, the most
+ * idiomatic way is to implement a trait that extends [[CustomQuerySerializer]],
+ * and mix that into your algorithm class, instead of overriding
+ * [[querySerializer]] directly.
+ *
+ * @tparam PD Prepared data class.
+ * @tparam M Trained model class.
+ * @tparam Q Input query class.
+ * @tparam P Output prediction class.
+ * @group Algorithm
+ */
+abstract class LAlgorithm[PD, M : ClassTag, Q, P]
+ extends BaseAlgorithm[RDD[PD], RDD[M], Q, P] {
+
+ def trainBase(sc: SparkContext, pd: RDD[PD]): RDD[M] = pd.map(train)
+
+ /** Implement this method to produce a model from prepared data.
+ *
+ * @param pd Prepared data for model training.
+ * @return Trained model.
+ */
+ def train(pd: PD): M
+
+ def batchPredictBase(sc: SparkContext, bm: Any, qs: RDD[(Long, Q)])
+ : RDD[(Long, P)] = {
+ val mRDD = bm.asInstanceOf[RDD[M]]
+ batchPredict(mRDD, qs)
+ }
+
+ /** This is a default implementation to perform batch prediction. Override
+ * this method for a custom implementation.
+ *
+ * @param mRDD A single model wrapped inside an RDD
+ * @param qs An RDD of index-query tuples. The index is used to keep track of
+ * predicted results with corresponding queries.
+ * @return Batch of predicted results
+ */
+ def batchPredict(mRDD: RDD[M], qs: RDD[(Long, Q)]): RDD[(Long, P)] = {
+ val glomQs: RDD[Array[(Long, Q)]] = qs.glom()
+ val cartesian: RDD[(M, Array[(Long, Q)])] = mRDD.cartesian(glomQs)
+ cartesian.flatMap { case (m, qArray) =>
+ qArray.map { case (qx, q) => (qx, predict(m, q)) }
+ }
+ }
+
+ def predictBase(localBaseModel: Any, q: Q): P = {
+ predict(localBaseModel.asInstanceOf[M], q)
+ }
+
+ /** Implement this method to produce a prediction from a query and trained
+ * model.
+ *
+ * @param m Trained model produced by [[train]].
+ * @param q An input query.
+ * @return A prediction.
+ */
+ def predict(m: M, q: Q): P
+
+ /** :: DeveloperApi ::
+ * Engine developers should not use this directly (read on to see how local
+ * algorithm models are persisted).
+ *
+ * Local algorithms produce local models. By default, models will be
+ * serialized and stored automatically. Engine developers can override this behavior by
+ * mixing the [[PersistentModel]] trait into the model class, and
+ * PredictionIO will call [[PersistentModel.save]] instead. If it returns
+ * true, a [[org.apache.predictionio.workflow.PersistentModelManifest]] will be
+ * returned so that during deployment, PredictionIO will use
+ * [[PersistentModelLoader]] to retrieve the model. Otherwise, Unit will be
+ * returned and the model will be re-trained on-the-fly.
+ *
+ * @param sc Spark context
+ * @param modelId Model ID
+ * @param algoParams Algorithm parameters that trained this model
+ * @param bm Model
+ * @return The model itself for automatic persistence, an instance of
+ * [[org.apache.predictionio.workflow.PersistentModelManifest]] for manual
+ * persistence, or Unit for re-training on deployment
+ */
+ @DeveloperApi
+ override
+ def makePersistentModel(
+ sc: SparkContext,
+ modelId: String,
+ algoParams: Params,
+ bm: Any): Any = {
+ // Check RDD[M].count == 1
+ val m = bm.asInstanceOf[RDD[M]].first()
+ if (m.isInstanceOf[PersistentModel[_]]) {
+ if (m.asInstanceOf[PersistentModel[Params]].save(
+ modelId, algoParams, sc)) {
+ PersistentModelManifest(className = m.getClass.getName)
+ } else {
+ Unit
+ }
+ } else {
+ m
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/LAverageServing.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/controller/LAverageServing.scala b/core/src/main/scala/org/apache/predictionio/controller/LAverageServing.scala
new file mode 100644
index 0000000..7fbe7ac
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/controller/LAverageServing.scala
@@ -0,0 +1,41 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.controller
+
+import org.apache.predictionio.core.BaseAlgorithm
+
+/** A concrete implementation of [[LServing]] returning the average of all
+ * algorithms' predictions, where their classes are expected to be all Double.
+ *
+ * @group Serving
+ */
+class LAverageServing[Q] extends LServing[Q, Double] {
+ /** Returns the average of all algorithms' predictions. */
+ def serve(query: Q, predictions: Seq[Double]): Double = {
+ predictions.sum / predictions.length
+ }
+}
+
+/** A concrete implementation of [[LServing]] returning the average of all
+ * algorithms' predictions, where their classes are expected to be all Double.
+ *
+ * @group Serving
+ */
+object LAverageServing {
+ /** Returns an instance of [[LAverageServing]]. */
+ def apply[Q](a: Class[_ <: BaseAlgorithm[_, _, Q, _]]): Class[LAverageServing[Q]] =
+ classOf[LAverageServing[Q]]
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/LDataSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/controller/LDataSource.scala b/core/src/main/scala/org/apache/predictionio/controller/LDataSource.scala
new file mode 100644
index 0000000..adb8e20
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/controller/LDataSource.scala
@@ -0,0 +1,67 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.controller
+
+import org.apache.predictionio.core.BaseDataSource
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+
+import scala.reflect._
+
+/** Base class of a local data source.
+ *
+ * A local data source runs locally within a single machine and return data
+ * that can fit within a single machine.
+ *
+ * @tparam TD Training data class.
+ * @tparam EI Evaluation Info class.
+ * @tparam Q Input query class.
+ * @tparam A Actual value class.
+ * @group Data Source
+ */
+abstract class LDataSource[TD: ClassTag, EI, Q, A]
+ extends BaseDataSource[RDD[TD], EI, Q, A] {
+
+ def readTrainingBase(sc: SparkContext): RDD[TD] = {
+ sc.parallelize(Seq(None)).map(_ => readTraining())
+ }
+
+ /** Implement this method to only return training data from a data source */
+ def readTraining(): TD
+
+ def readEvalBase(sc: SparkContext): Seq[(RDD[TD], EI, RDD[(Q, A)])] = {
+ val localEvalData: Seq[(TD, EI, Seq[(Q, A)])] = readEval()
+
+ localEvalData.map { case (td, ei, qaSeq) => {
+ val tdRDD = sc.parallelize(Seq(None)).map(_ => td)
+ val qaRDD = sc.parallelize(qaSeq)
+ (tdRDD, ei, qaRDD)
+ }}
+ }
+
+ /** To provide evaluation feature for your engine, your must override this
+ * method to return data for evaluation from a data source. Returned data can
+ * optionally include a sequence of query and actual value pairs for
+ * evaluation purpose.
+ *
+ * The default implementation returns an empty sequence as a stub, so that
+ * an engine can be compiled without implementing evaluation.
+ */
+ def readEval(): Seq[(TD, EI, Seq[(Q, A)])] = Seq[(TD, EI, Seq[(Q, A)])]()
+
+ @deprecated("Use readEval() instead.", "0.9.0")
+ def read(): Seq[(TD, EI, Seq[(Q, A)])] = readEval()
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/LFirstServing.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/controller/LFirstServing.scala b/core/src/main/scala/org/apache/predictionio/controller/LFirstServing.scala
new file mode 100644
index 0000000..e677743
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/controller/LFirstServing.scala
@@ -0,0 +1,39 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.controller
+
+import org.apache.predictionio.core.BaseAlgorithm
+
+/** A concrete implementation of [[LServing]] returning the first algorithm's
+ * prediction result directly without any modification.
+ *
+ * @group Serving
+ */
+class LFirstServing[Q, P] extends LServing[Q, P] {
+ /** Returns the first algorithm's prediction. */
+ def serve(query: Q, predictions: Seq[P]): P = predictions.head
+}
+
+/** A concrete implementation of [[LServing]] returning the first algorithm's
+ * prediction result directly without any modification.
+ *
+ * @group Serving
+ */
+object LFirstServing {
+ /** Returns an instance of [[LFirstServing]]. */
+ def apply[Q, P](a: Class[_ <: BaseAlgorithm[_, _, Q, P]]): Class[LFirstServing[Q, P]] =
+ classOf[LFirstServing[Q, P]]
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/LPreparator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/controller/LPreparator.scala b/core/src/main/scala/org/apache/predictionio/controller/LPreparator.scala
new file mode 100644
index 0000000..32ffd5d
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/controller/LPreparator.scala
@@ -0,0 +1,46 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.controller
+
+import org.apache.predictionio.core.BasePreparator
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+
+import scala.reflect._
+
+/** Base class of a local preparator.
+ *
+ * A local preparator runs locally within a single machine and produces
+ * prepared data that can fit within a single machine.
+ *
+ * @tparam TD Training data class.
+ * @tparam PD Prepared data class.
+ * @group Preparator
+ */
+abstract class LPreparator[TD, PD : ClassTag]
+ extends BasePreparator[RDD[TD], RDD[PD]] {
+
+ def prepareBase(sc: SparkContext, rddTd: RDD[TD]): RDD[PD] = {
+ rddTd.map(prepare)
+ }
+
+ /** Implement this method to produce prepared data that is ready for model
+ * training.
+ *
+ * @param trainingData Training data to be prepared.
+ */
+ def prepare(trainingData: TD): PD
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/LServing.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/controller/LServing.scala b/core/src/main/scala/org/apache/predictionio/controller/LServing.scala
new file mode 100644
index 0000000..653b998
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/controller/LServing.scala
@@ -0,0 +1,52 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.controller
+
+import org.apache.predictionio.annotation.Experimental
+import org.apache.predictionio.core.BaseServing
+
+/** Base class of serving.
+ *
+ * @tparam Q Input query class.
+ * @tparam P Output prediction class.
+ * @group Serving
+ */
+abstract class LServing[Q, P] extends BaseServing[Q, P] {
+ def supplementBase(q: Q): Q = supplement(q)
+
+ /** :: Experimental ::
+ * Implement this method to supplement the query before sending it to
+ * algorithms.
+ *
+ * @param q Query
+ * @return A supplemented Query
+ */
+ @Experimental
+ def supplement(q: Q): Q = q
+
+ def serveBase(q: Q, ps: Seq[P]): P = {
+ serve(q, ps)
+ }
+
+ /** Implement this method to combine multiple algorithms' predictions to
+ * produce a single final prediction. The query is the original query sent to
+ * the engine, not the supplemented produced by [[LServing.supplement]].
+ *
+ * @param query Original input query.
+ * @param predictions A list of algorithms' predictions.
+ */
+ def serve(query: Q, predictions: Seq[P]): P
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/LocalFileSystemPersistentModel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/controller/LocalFileSystemPersistentModel.scala b/core/src/main/scala/org/apache/predictionio/controller/LocalFileSystemPersistentModel.scala
new file mode 100644
index 0000000..f90e28d
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/controller/LocalFileSystemPersistentModel.scala
@@ -0,0 +1,74 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.controller
+
+import org.apache.spark.SparkContext
+
+/** This trait is a convenience helper for persisting your model to the local
+ * filesystem. This trait and [[LocalFileSystemPersistentModelLoader]] contain
+ * concrete implementation and need not be implemented.
+ *
+ * The underlying implementation is [[Utils.save]].
+ *
+ * {{{
+ * class MyModel extends LocalFileSystemPersistentModel[MyParams] {
+ * ...
+ * }
+ *
+ * object MyModel extends LocalFileSystemPersistentModelLoader[MyParams, MyModel] {
+ * ...
+ * }
+ * }}}
+ *
+ * @tparam AP Algorithm parameters class.
+ * @see [[LocalFileSystemPersistentModelLoader]]
+ * @group Algorithm
+ */
+trait LocalFileSystemPersistentModel[AP <: Params] extends PersistentModel[AP] {
+ def save(id: String, params: AP, sc: SparkContext): Boolean = {
+ Utils.save(id, this)
+ true
+ }
+}
+
+/** Implement an object that extends this trait for PredictionIO to support
+ * loading a persisted model from local filesystem during serving deployment.
+ *
+ * The underlying implementation is [[Utils.load]].
+ *
+ * @tparam AP Algorithm parameters class.
+ * @tparam M Model class.
+ * @see [[LocalFileSystemPersistentModel]]
+ * @group Algorithm
+ */
+trait LocalFileSystemPersistentModelLoader[AP <: Params, M]
+ extends PersistentModelLoader[AP, M] {
+ def apply(id: String, params: AP, sc: Option[SparkContext]): M = {
+ Utils.load(id).asInstanceOf[M]
+ }
+}
+
+/** DEPRECATED. Use [[LocalFileSystemPersistentModel]] instead.
+ *
+ * @group Algorithm */
+@deprecated("Use LocalFileSystemPersistentModel instead.", "0.9.2")
+trait IFSPersistentModel[AP <: Params] extends LocalFileSystemPersistentModel[AP]
+
+/** DEPRECATED. Use [[LocalFileSystemPersistentModelLoader]] instead.
+ *
+ * @group Algorithm */
+@deprecated("Use LocalFileSystemPersistentModelLoader instead.", "0.9.2")
+trait IFSPersistentModelLoader[AP <: Params, M] extends LocalFileSystemPersistentModelLoader[AP, M]