You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by sh...@apache.org on 2017/07/10 04:10:09 UTC

[06/11] incubator-predictionio git commit: [PIO-97] Fixes examples of the official templates for v0.11.0-incubating.

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-serving/src/main/scala/ALSAlgorithm.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/customize-serving/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-recommendation/customize-serving/src/main/scala/ALSAlgorithm.scala
new file mode 100644
index 0000000..b0f874d
--- /dev/null
+++ b/examples/scala-parallel-recommendation/customize-serving/src/main/scala/ALSAlgorithm.scala
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.recommendation
+
+import org.apache.predictionio.controller.PAlgorithm
+import org.apache.predictionio.controller.Params
+import org.apache.predictionio.data.storage.BiMap
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.recommendation.ALS
+import org.apache.spark.mllib.recommendation.{Rating => MLlibRating}
+import org.apache.spark.mllib.recommendation.ALSModel
+
+import grizzled.slf4j.Logger
+
+case class ALSAlgorithmParams(
+  rank: Int,
+  numIterations: Int,
+  lambda: Double,
+  seed: Option[Long]) extends Params
+
+class ALSAlgorithm(val ap: ALSAlgorithmParams)
+  extends PAlgorithm[PreparedData, ALSModel, Query, PredictedResult] {
+
+  @transient lazy val logger = Logger[this.type]
+
+  if (ap.numIterations > 30) {
+    logger.warn(
+      s"ALSAlgorithmParams.numIterations > 30, current: ${ap.numIterations}. " +
+      s"There is a chance of running to StackOverflowException." +
+      s"To remedy it, set lower numIterations or checkpoint parameters.")
+  }
+
+  def train(sc: SparkContext, data: PreparedData): ALSModel = {
+    // MLLib ALS cannot handle empty training data.
+    require(!data.ratings.take(1).isEmpty,
+      s"RDD[Rating] in PreparedData cannot be empty." +
+      " Please check if DataSource generates TrainingData" +
+      " and Preparator generates PreparedData correctly.")
+    // Convert user and item String IDs to Int index for MLlib
+
+    val userStringIntMap = BiMap.stringInt(data.ratings.map(_.user))
+    val itemStringIntMap = BiMap.stringInt(data.ratings.map(_.item))
+    val mllibRatings = data.ratings.map( r =>
+      // MLlibRating requires integer index for user and item
+      MLlibRating(userStringIntMap(r.user), itemStringIntMap(r.item), r.rating)
+    )
+
+    // seed for MLlib ALS
+    val seed = ap.seed.getOrElse(System.nanoTime)
+
+    // Set checkpoint directory
+    // sc.setCheckpointDir("checkpoint")
+
+    // If you only have one type of implicit event (Eg. "view" event only),
+    // set implicitPrefs to true
+    val implicitPrefs = false
+    val als = new ALS()
+    als.setUserBlocks(-1)
+    als.setProductBlocks(-1)
+    als.setRank(ap.rank)
+    als.setIterations(ap.numIterations)
+    als.setLambda(ap.lambda)
+    als.setImplicitPrefs(implicitPrefs)
+    als.setAlpha(1.0)
+    als.setSeed(seed)
+    als.setCheckpointInterval(10)
+    val m = als.run(mllibRatings)
+
+    new ALSModel(
+      rank = m.rank,
+      userFeatures = m.userFeatures,
+      productFeatures = m.productFeatures,
+      userStringIntMap = userStringIntMap,
+      itemStringIntMap = itemStringIntMap)
+  }
+
+  def predict(model: ALSModel, query: Query): PredictedResult = {
+    // Convert String ID to Int index for Mllib
+    model.userStringIntMap.get(query.user).map { userInt =>
+      // create inverse view of itemStringIntMap
+      val itemIntStringMap = model.itemStringIntMap.inverse
+      // recommendProducts() returns Array[MLlibRating], which uses item Int
+      // index. Convert it to String ID for returning PredictedResult
+      val itemScores = model.recommendProducts(userInt, query.num)
+        .map (r => ItemScore(itemIntStringMap(r.product), r.rating))
+      new PredictedResult(itemScores)
+    }.getOrElse{
+      logger.info(s"No prediction for unknown user ${query.user}.")
+      new PredictedResult(Array.empty)
+    }
+  }
+
+  // This function is used by the evaluation module, where a batch of queries is sent to this engine
+  // for evaluation purpose.
+  override def batchPredict(model: ALSModel, queries: RDD[(Long, Query)]): RDD[(Long, PredictedResult)] = {
+    val userIxQueries: RDD[(Int, (Long, Query))] = queries
+    .map { case (ix, query) => {
+      // If user not found, then the index is -1
+      val userIx = model.userStringIntMap.get(query.user).getOrElse(-1)
+      (userIx, (ix, query))
+    }}
+
+    // Cross product of all valid users from the queries and products in the model.
+    val usersProducts: RDD[(Int, Int)] = userIxQueries
+      .keys
+      .filter(_ != -1)
+      .cartesian(model.productFeatures.map(_._1))
+
+    // Call mllib ALS's predict function.
+    val ratings: RDD[MLlibRating] = model.predict(usersProducts)
+
+    // The following code construct predicted results from mllib's ratings.
+    // Not optimal implementation. Instead of groupBy, should use combineByKey with a PriorityQueue
+    val userRatings: RDD[(Int, Iterable[MLlibRating])] = ratings.groupBy(_.user)
+
+    userIxQueries.leftOuterJoin(userRatings)
+    .map {
+      // When there are ratings
+      case (userIx, ((ix, query), Some(ratings))) => {
+        val topItemScores: Array[ItemScore] = ratings
+        .toArray
+        .sortBy(_.rating)(Ordering.Double.reverse) // note: from large to small ordering
+        .take(query.num)
+        .map { rating => ItemScore(
+          model.itemStringIntMap.inverse(rating.product),
+          rating.rating) }
+
+        (ix, PredictedResult(itemScores = topItemScores))
+      }
+      // When user doesn't exist in training data
+      case (userIx, ((ix, query), None)) => {
+        require(userIx == -1)
+        (ix, PredictedResult(itemScores = Array.empty))
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-serving/src/main/scala/ALSModel.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/customize-serving/src/main/scala/ALSModel.scala b/examples/scala-parallel-recommendation/customize-serving/src/main/scala/ALSModel.scala
new file mode 100644
index 0000000..898858d
--- /dev/null
+++ b/examples/scala-parallel-recommendation/customize-serving/src/main/scala/ALSModel.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.recommendation
+// This must be the same package as Spark's MatrixFactorizationModel because
+// MatrixFactorizationModel's constructor is private and we are using
+// its constructor in order to save and load the model
+
+import org.apache.predictionio.examples.recommendation.ALSAlgorithmParams
+
+import org.apache.predictionio.controller.PersistentModel
+import org.apache.predictionio.controller.PersistentModelLoader
+import org.apache.predictionio.data.storage.BiMap
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+class ALSModel(
+    override val rank: Int,
+    override val userFeatures: RDD[(Int, Array[Double])],
+    override val productFeatures: RDD[(Int, Array[Double])],
+    val userStringIntMap: BiMap[String, Int],
+    val itemStringIntMap: BiMap[String, Int])
+  extends MatrixFactorizationModel(rank, userFeatures, productFeatures)
+  with PersistentModel[ALSAlgorithmParams] {
+
+  def save(id: String, params: ALSAlgorithmParams,
+    sc: SparkContext): Boolean = {
+
+    sc.parallelize(Seq(rank)).saveAsObjectFile(s"/tmp/${id}/rank")
+    userFeatures.saveAsObjectFile(s"/tmp/${id}/userFeatures")
+    productFeatures.saveAsObjectFile(s"/tmp/${id}/productFeatures")
+    sc.parallelize(Seq(userStringIntMap))
+      .saveAsObjectFile(s"/tmp/${id}/userStringIntMap")
+    sc.parallelize(Seq(itemStringIntMap))
+      .saveAsObjectFile(s"/tmp/${id}/itemStringIntMap")
+    true
+  }
+
+  override def toString = {
+    s"userFeatures: [${userFeatures.count()}]" +
+    s"(${userFeatures.take(2).toList}...)" +
+    s" productFeatures: [${productFeatures.count()}]" +
+    s"(${productFeatures.take(2).toList}...)" +
+    s" userStringIntMap: [${userStringIntMap.size}]" +
+    s"(${userStringIntMap.take(2)}...)" +
+    s" itemStringIntMap: [${itemStringIntMap.size}]" +
+    s"(${itemStringIntMap.take(2)}...)"
+  }
+}
+
+object ALSModel
+  extends PersistentModelLoader[ALSAlgorithmParams, ALSModel] {
+  def apply(id: String, params: ALSAlgorithmParams,
+    sc: Option[SparkContext]) = {
+    new ALSModel(
+      rank = sc.get.objectFile[Int](s"/tmp/${id}/rank").first,
+      userFeatures = sc.get.objectFile(s"/tmp/${id}/userFeatures"),
+      productFeatures = sc.get.objectFile(s"/tmp/${id}/productFeatures"),
+      userStringIntMap = sc.get
+        .objectFile[BiMap[String, Int]](s"/tmp/${id}/userStringIntMap").first,
+      itemStringIntMap = sc.get
+        .objectFile[BiMap[String, Int]](s"/tmp/${id}/itemStringIntMap").first)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-serving/src/main/scala/DataSource.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/customize-serving/src/main/scala/DataSource.scala b/examples/scala-parallel-recommendation/customize-serving/src/main/scala/DataSource.scala
new file mode 100644
index 0000000..d606ad3
--- /dev/null
+++ b/examples/scala-parallel-recommendation/customize-serving/src/main/scala/DataSource.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.recommendation
+
+import org.apache.predictionio.controller.PDataSource
+import org.apache.predictionio.controller.EmptyEvaluationInfo
+import org.apache.predictionio.controller.EmptyActualResult
+import org.apache.predictionio.controller.Params
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.store.PEventStore
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+import grizzled.slf4j.Logger
+
+case class DataSourceEvalParams(kFold: Int, queryNum: Int)
+
+case class DataSourceParams(
+  appName: String,
+  evalParams: Option[DataSourceEvalParams]) extends Params
+
+class DataSource(val dsp: DataSourceParams)
+  extends PDataSource[TrainingData,
+      EmptyEvaluationInfo, Query, ActualResult] {
+
+  @transient lazy val logger = Logger[this.type]
+
+  def getRatings(sc: SparkContext): RDD[Rating] = {
+
+    val eventsRDD: RDD[Event] = PEventStore.find(
+      appName = dsp.appName,
+      entityType = Some("user"),
+      eventNames = Some(List("rate", "buy")), // read "rate" and "buy" event
+      // targetEntityType is optional field of an event.
+      targetEntityType = Some(Some("item")))(sc)
+
+    val ratingsRDD: RDD[Rating] = eventsRDD.map { event =>
+      val rating = try {
+        val ratingValue: Double = event.event match {
+          case "rate" => event.properties.get[Double]("rating")
+          case "buy" => 4.0 // map buy event to rating value of 4
+          case _ => throw new Exception(s"Unexpected event ${event} is read.")
+        }
+        // entityId and targetEntityId is String
+        Rating(event.entityId,
+          event.targetEntityId.get,
+          ratingValue)
+      } catch {
+        case e: Exception => {
+          logger.error(s"Cannot convert ${event} to Rating. Exception: ${e}.")
+          throw e
+        }
+      }
+      rating
+    }.cache()
+
+    ratingsRDD
+  }
+
+  override
+  def readTraining(sc: SparkContext): TrainingData = {
+    new TrainingData(getRatings(sc))
+  }
+
+  override
+  def readEval(sc: SparkContext)
+  : Seq[(TrainingData, EmptyEvaluationInfo, RDD[(Query, ActualResult)])] = {
+    require(!dsp.evalParams.isEmpty, "Must specify evalParams")
+    val evalParams = dsp.evalParams.get
+
+    val kFold = evalParams.kFold
+    val ratings: RDD[(Rating, Long)] = getRatings(sc).zipWithUniqueId
+    ratings.cache
+
+    (0 until kFold).map { idx => {
+      val trainingRatings = ratings.filter(_._2 % kFold != idx).map(_._1)
+      val testingRatings = ratings.filter(_._2 % kFold == idx).map(_._1)
+
+      val testingUsers: RDD[(String, Iterable[Rating])] = testingRatings.groupBy(_.user)
+
+      (new TrainingData(trainingRatings),
+        new EmptyEvaluationInfo(),
+        testingUsers.map {
+          case (user, ratings) => (Query(user, evalParams.queryNum), ActualResult(ratings.toArray))
+        }
+      )
+    }}
+  }
+}
+
+case class Rating(
+  user: String,
+  item: String,
+  rating: Double
+)
+
+class TrainingData(
+  val ratings: RDD[Rating]
+) extends Serializable {
+  override def toString = {
+    s"ratings: [${ratings.count()}] (${ratings.take(2).toList}...)"
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-serving/src/main/scala/Engine.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/customize-serving/src/main/scala/Engine.scala b/examples/scala-parallel-recommendation/customize-serving/src/main/scala/Engine.scala
new file mode 100644
index 0000000..b2a668b
--- /dev/null
+++ b/examples/scala-parallel-recommendation/customize-serving/src/main/scala/Engine.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.recommendation
+
+import org.apache.predictionio.controller.EngineFactory
+import org.apache.predictionio.controller.Engine
+
+case class Query(
+  user: String,
+  num: Int
+) extends Serializable
+
+case class PredictedResult(
+  itemScores: Array[ItemScore]
+) extends Serializable
+
+case class ActualResult(
+  ratings: Array[Rating]
+) extends Serializable
+
+case class ItemScore(
+  item: String,
+  score: Double
+) extends Serializable
+
+object RecommendationEngine extends EngineFactory {
+  def apply() = {
+    new Engine(
+      classOf[DataSource],
+      classOf[Preparator],
+      Map("als" -> classOf[ALSAlgorithm]),
+      classOf[Serving])
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-serving/src/main/scala/Evaluation.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/customize-serving/src/main/scala/Evaluation.scala b/examples/scala-parallel-recommendation/customize-serving/src/main/scala/Evaluation.scala
new file mode 100644
index 0000000..a665496
--- /dev/null
+++ b/examples/scala-parallel-recommendation/customize-serving/src/main/scala/Evaluation.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.recommendation
+
+import org.apache.predictionio.controller.Evaluation
+import org.apache.predictionio.controller.OptionAverageMetric
+import org.apache.predictionio.controller.AverageMetric
+import org.apache.predictionio.controller.EmptyEvaluationInfo
+import org.apache.predictionio.controller.EngineParamsGenerator
+import org.apache.predictionio.controller.EngineParams
+import org.apache.predictionio.controller.MetricEvaluator
+
+// Usage:
+// $ pio eval org.example.recommendation.RecommendationEvaluation \
+//   org.example.recommendation.EngineParamsList
+
+case class PrecisionAtK(k: Int, ratingThreshold: Double = 2.0)
+    extends OptionAverageMetric[EmptyEvaluationInfo, Query, PredictedResult, ActualResult] {
+  require(k > 0, "k must be greater than 0")
+
+  override def header = s"Precision@K (k=$k, threshold=$ratingThreshold)"
+
+  def calculate(q: Query, p: PredictedResult, a: ActualResult): Option[Double] = {
+    val positives: Set[String] = a.ratings.filter(_.rating >= ratingThreshold).map(_.item).toSet
+
+    // If there is no positive results, Precision is undefined. We don't consider this case in the
+    // metrics, hence we return None.
+    if (positives.size == 0) {
+      None
+    } else {
+      val tpCount: Int = p.itemScores.take(k).filter(is => positives(is.item)).size
+      Some(tpCount.toDouble / math.min(k, positives.size))
+    }
+  }
+}
+
+case class PositiveCount(ratingThreshold: Double = 2.0)
+    extends AverageMetric[EmptyEvaluationInfo, Query, PredictedResult, ActualResult] {
+  override def header = s"PositiveCount (threshold=$ratingThreshold)"
+
+  def calculate(q: Query, p: PredictedResult, a: ActualResult): Double = {
+    a.ratings.filter(_.rating >= ratingThreshold).size
+  }
+}
+
+object RecommendationEvaluation extends Evaluation {
+  engineEvaluator = (
+    RecommendationEngine(),
+    MetricEvaluator(
+      metric = PrecisionAtK(k = 10, ratingThreshold = 4.0),
+      otherMetrics = Seq(
+        PositiveCount(ratingThreshold = 4.0),
+        PrecisionAtK(k = 10, ratingThreshold = 2.0),
+        PositiveCount(ratingThreshold = 2.0),
+        PrecisionAtK(k = 10, ratingThreshold = 1.0),
+        PositiveCount(ratingThreshold = 1.0)
+      )))
+}
+
+
+object ComprehensiveRecommendationEvaluation extends Evaluation {
+  val ratingThresholds = Seq(0.0, 2.0, 4.0)
+  val ks = Seq(1, 3, 10)
+
+  engineEvaluator = (
+    RecommendationEngine(),
+    MetricEvaluator(
+      metric = PrecisionAtK(k = 3, ratingThreshold = 2.0),
+      otherMetrics = (
+        (for (r <- ratingThresholds) yield PositiveCount(ratingThreshold = r)) ++
+        (for (r <- ratingThresholds; k <- ks) yield PrecisionAtK(k = k, ratingThreshold = r))
+      )))
+}
+
+
+trait BaseEngineParamsList extends EngineParamsGenerator {
+  protected val baseEP = EngineParams(
+    dataSourceParams = DataSourceParams(
+      appName = "MyApp1",
+      evalParams = Some(DataSourceEvalParams(kFold = 5, queryNum = 10))))
+}
+
+object EngineParamsList extends BaseEngineParamsList {
+  engineParamsList = for(
+    rank <- Seq(5, 10, 20);
+    numIterations <- Seq(1, 5, 10))
+    yield baseEP.copy(
+      algorithmParamsList = Seq(
+        ("als", ALSAlgorithmParams(rank, numIterations, 0.01, Some(3)))))
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-serving/src/main/scala/Preparator.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/customize-serving/src/main/scala/Preparator.scala b/examples/scala-parallel-recommendation/customize-serving/src/main/scala/Preparator.scala
new file mode 100644
index 0000000..6a41c47
--- /dev/null
+++ b/examples/scala-parallel-recommendation/customize-serving/src/main/scala/Preparator.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.recommendation
+
+import org.apache.predictionio.controller.PPreparator
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+class Preparator
+  extends PPreparator[TrainingData, PreparedData] {
+
+  def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = {
+    new PreparedData(ratings = trainingData.ratings)
+  }
+}
+
+class PreparedData(
+  val ratings: RDD[Rating]
+) extends Serializable

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-serving/src/main/scala/Serving.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/customize-serving/src/main/scala/Serving.scala b/examples/scala-parallel-recommendation/customize-serving/src/main/scala/Serving.scala
new file mode 100644
index 0000000..1042bce
--- /dev/null
+++ b/examples/scala-parallel-recommendation/customize-serving/src/main/scala/Serving.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.recommendation
+
+import org.apache.predictionio.controller.LServing
+
+import scala.io.Source
+
+import org.apache.predictionio.controller.Params  // ADDED
+
+// ADDED ServingParams to specify the blacklisting file location.
+case class ServingParams(filepath: String) extends Params
+
+class Serving(val params: ServingParams)
+  extends LServing[Query, PredictedResult] {
+
+  override
+  def serve(query: Query, predictedResults: Seq[PredictedResult])
+  : PredictedResult = {
+    val disabledProducts: Set[String] = Source
+      .fromFile(params.filepath)
+      .getLines
+      .toSet
+
+    val itemScores = predictedResults.head.itemScores
+    PredictedResult(itemScores.filter(ps => !disabledProducts(ps.item)))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-serving/template.json
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/customize-serving/template.json b/examples/scala-parallel-recommendation/customize-serving/template.json
new file mode 100644
index 0000000..d076ec5
--- /dev/null
+++ b/examples/scala-parallel-recommendation/customize-serving/template.json
@@ -0,0 +1 @@
+{"pio": {"version": { "min": "0.10.0-incubating" }}}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/filter-by-category/README.md
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/filter-by-category/README.md b/examples/scala-parallel-recommendation/filter-by-category/README.md
deleted file mode 100644
index e4a7350..0000000
--- a/examples/scala-parallel-recommendation/filter-by-category/README.md
+++ /dev/null
@@ -1,202 +0,0 @@
-<!--
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
--->
-
-# Recommendation Template With Filtering by Category
-
-This engine template is based on the Recommendation Template version v0.1.2. It's modified so that each item has a set
-of categories, queries include a `categories` field and results only include items in any of the specified categories.
-
-## Documentation
-
-Please refer to http://predictionio.incubator.apache.org/templates/recommendation/quickstart/
-
-## Development Notes
-
-### Sample data
-
-This example uses two different files from the MovieLens 100k data. [Download](http://files.grouplens.org/datasets/movielens/ml-100k.zip)
-
-The first file is the ratings data file, which is a tab separated file. The first 3 fields are used: user id, item id
-and rating.
-
-The second file is the items data file, in where different fields are separated by the `|` character. The fields of
-interest are the first one (item id) and 19 category fields, starting at the 6th one and going till the end. Each
-of these fields has the value `1` if the item is the given category, and `0` otherwise.
-
-### Importing sample data
-
-The `data/import_eventserver.py` script allows importing the sample data. Valid arguments are:
-
-* `--url`: Event server URL
-* `--access_key` App access key
-* `--ratings_file`: Ratings file path
-* `--items_file`: Items file path
-
-### Changes to Engine.scala
-
-Added `category` field to the `Query` class:
-
-```scala
-case class Query(
-  user: String,
-  num: Int,
-  categories: Array[String]
-) extends Serializable
-```
-
-### Changes to DataSource.scala
-
-* Introduced class `Item`:
-
-```scala
-case class Item(
-  id: String,
-  categories: List[String]
-)
-```
-
-* Modified class `TrainingData` to include items:
-
-```scala
-class TrainingData(
-  val items: RDD[Item],
-  val ratings: RDD[Rating]
-) extends Serializable {
-  override def toString = {
-    s"items: [${items.count()}] (${items.take(2).toList}...)" +
-    s" ratings: [${ratings.count()}] (${ratings.take(2).toList}...)"
-  }
-}
-```
-
-* Modified `readTraining` function so that it loads items:
-
-```scala
-    ...
-    val itemsRDD: RDD[Item] = eventsDb.aggregateProperties(
-      appId = dsp.appId,
-      entityType = "item"
-    )(sc).map {
-      case (entityId, properties) =>
-        try {
-          Item(id = entityId, categories = properties.get[List[String]]("categories"))
-        } catch {
-          case e: Exception =>
-            logger.error(s"Failed to get properties ${properties} of" +
-              s" item ${entityId}. Exception: ${e}.")
-            throw e
-        }
-    }
-    ...
-```
-
-### Changes to Preparator.scala
-
-* Added `items` field to class `PreparedData`. Pass `items` value from training data.
-
-```scala
-class PreparedData(
-  val items: RDD[Item],
-  val ratings: RDD[Rating]
-) extends Serializable
-```
-
-```scala
-    new PreparedData(items = trainingData.items, ratings = trainingData.ratings)
-```
-
-### Changes to ALSModel.scala
-
-* Added `categoryItemsMap` field to ALSModel class
-
-```scala
-class ALSModel(
-    override val rank: Int,
-    override val userFeatures: RDD[(Int, Array[Double])],
-    override val productFeatures: RDD[(Int, Array[Double])],
-    val userStringIntMap: BiMap[String, Int],
-    val itemStringIntMap: BiMap[String, Int],
-    val categoryItemsMap: Map[String, Set[Int]])
-```
-
-* Added a function that recommends products whose ID is in a list of sets
-
-```scala
-  def recommendProductsFromCategory(user: Int, num: Int, categoryItems: Array[Set[Int]]) = {
-    val filteredProductFeatures = productFeatures
-      .filter { case (id, _) => categoryItems.exists(_.contains(id)) }
-    recommend(userFeatures.lookup(user).head, filteredProductFeatures, num)
-      .map(t => Rating(user, t._1, t._2))
-  }
-
-  private def recommend(
-      recommendToFeatures: Array[Double],
-      recommendableFeatures: RDD[(Int, Array[Double])],
-      num: Int): Array[(Int, Double)] = {
-    val recommendToVector = new DoubleMatrix(recommendToFeatures)
-    val scored = recommendableFeatures.map { case (id,features) =>
-      (id, recommendToVector.dot(new DoubleMatrix(features)))
-    }
-    scored.top(num)(Ordering.by(_._2))
-  }
-```
-
-### Changes to ALSAlgorithm.scala
-
-* Find set of categories
-
-```scala
-    ...
-    val categories = data.items.flatMap(_.categories).distinct().collect().toSet
-    ...
-```
-
-* Find set of items of each category
-
-```scala
-    ...
-    val categoriesMap = categories.map { category =>
-      category -> data.items
-        .filter(_.categories.contains(category))
-        .map(item => itemStringIntMap(item.id))
-        .collect()
-        .toSet
-    }.toMap
-    ...
-```
-
-* Find list of sets of items on which we are interested, use new `recommendProductsFromCategory` function:
-
-```scala
-      ...
-      val categoriesItems = query.categories.map { category =>
-        model.categoryItemsMap.getOrElse(category, Set.empty)
-      }
-      // recommendProductsFromCategory() returns Array[MLlibRating], which uses item Int
-      // index. Convert it to String ID for returning PredictedResult
-      val itemScores = model.recommendProductsFromCategory(userInt, query.num, categoriesItems)
-        .map (r => ItemScore(itemIntStringMap(r.product), r.rating))
-      ...
-```
-
-### Example Request
-
-The script `data/send_query.py` has been modified to represent the updated query structure:
-
-```python
-print engine_client.send_query({"user": "1", "num": 4, "categories": ["action", "western"]})
-```

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/filter-by-category/build.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/filter-by-category/build.sbt b/examples/scala-parallel-recommendation/filter-by-category/build.sbt
deleted file mode 100644
index 81cd3ec..0000000
--- a/examples/scala-parallel-recommendation/filter-by-category/build.sbt
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import AssemblyKeys._
-
-assemblySettings
-
-name := "template-scala-parallel-recommendation"
-
-organization := "org.apache.predictionio"
-
-libraryDependencies ++= Seq(
-  "org.apache.predictionio"    %% "core"          % pioVersion.value % "provided",
-  "org.apache.spark" %% "spark-core"    % "1.2.0" % "provided",
-  "org.apache.spark" %% "spark-mllib"   % "1.2.0" % "provided")

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/filter-by-category/data/import_eventserver.py
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/filter-by-category/data/import_eventserver.py b/examples/scala-parallel-recommendation/filter-by-category/data/import_eventserver.py
deleted file mode 100644
index 643f4db..0000000
--- a/examples/scala-parallel-recommendation/filter-by-category/data/import_eventserver.py
+++ /dev/null
@@ -1,98 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""
-Import sample data for recommendation engine
-"""
-
-import predictionio
-import argparse
-import random
-
-ITEM_ACTIONS_DELIMITER = "|"
-RATE_ACTIONS_DELIMITER = "\t"
-SEED = 3
-
-GENRES = ["unknown", "action", "adventure", "animation", "children's", "comedy","crime", "documentary", "drama",
-          "fantasy", "film-noir", "horror", "musical", "mystery", "romance", "sci-fi", "thriller", "war", "western"]
-
-def import_events(client, items_file, ratings_file):
-  random.seed(SEED)
-
-  f = open(items_file, 'r')
-  print "Importing items..."
-  items = 0
-  for line in f:
-    data = line.rstrip('\r\n').split(ITEM_ACTIONS_DELIMITER)
-    id = data[0]
-    genres_str = data[5:]
-    genres_bool = [bool(int(g)) for g in genres_str]
-    genres = [g for b, g in zip(genres_bool, GENRES) if b]
-    client.create_event(
-      event="$set",
-      entity_type="item",
-      entity_id=id,
-      properties= { "categories" : genres }
-    )
-    items += 1
-  print "%s items are imported." % items
-  f.close()
-
-  f = open(ratings_file, 'r')
-  print "Importing ratings..."
-  ratings = 0
-  for line in f:
-    data = line.rstrip('\r\n').split(RATE_ACTIONS_DELIMITER)
-    # For demonstration purpose, randomly mix in some buy events
-    if random.randint(0, 1) == 1:
-      client.create_event(
-        event="rate",
-        entity_type="user",
-        entity_id=data[0],
-        target_entity_type="item",
-        target_entity_id=data[1],
-        properties= { "rating" : float(data[2]) }
-      )
-    else:
-      client.create_event(
-        event="buy",
-        entity_type="user",
-        entity_id=data[0],
-        target_entity_type="item",
-        target_entity_id=data[1]
-      )
-    ratings += 1
-  f.close()
-  print "%s ratings are imported." % ratings
-
-if __name__ == '__main__':
-  parser = argparse.ArgumentParser(
-    description="Import sample data for recommendation engine")
-  parser.add_argument('--access_key', default='invald_access_key')
-  parser.add_argument('--url', default="http://localhost:7070")
-  parser.add_argument('--ratings_file')
-  parser.add_argument('--items_file')
-
-  args = parser.parse_args()
-  print args
-
-  client = predictionio.EventClient(
-    access_key=args.access_key,
-    url=args.url,
-    threads=5,
-    qsize=500)
-  import_events(client, args.items_file, args.ratings_file)

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/filter-by-category/data/send_query.py
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/filter-by-category/data/send_query.py b/examples/scala-parallel-recommendation/filter-by-category/data/send_query.py
deleted file mode 100644
index 9e061de..0000000
--- a/examples/scala-parallel-recommendation/filter-by-category/data/send_query.py
+++ /dev/null
@@ -1,24 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""
-Send sample query to prediction engine
-"""
-
-import predictionio
-engine_client = predictionio.EngineClient(url="http://localhost:8000")
-print engine_client.send_query({"user": "1", "num": 4, "categories": ["action", "western"]})

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/filter-by-category/engine.json
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/filter-by-category/engine.json b/examples/scala-parallel-recommendation/filter-by-category/engine.json
deleted file mode 100644
index f97113e..0000000
--- a/examples/scala-parallel-recommendation/filter-by-category/engine.json
+++ /dev/null
@@ -1,21 +0,0 @@
-{
-  "id": "default",
-  "description": "Default settings",
-  "engineFactory": "org.template.recommendation.RecommendationEngine",
-  "datasource": {
-    "params" : {
-      "appId": 2
-    }
-  },
-  "algorithms": [
-    {
-      "name": "als",
-      "params": {
-        "rank": 10,
-        "numIterations": 20,
-        "lambda": 0.01,
-        "seed": 3
-      }
-    }
-  ]
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/filter-by-category/project/assembly.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/filter-by-category/project/assembly.sbt b/examples/scala-parallel-recommendation/filter-by-category/project/assembly.sbt
deleted file mode 100644
index 54c3252..0000000
--- a/examples/scala-parallel-recommendation/filter-by-category/project/assembly.sbt
+++ /dev/null
@@ -1 +0,0 @@
-addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/filter-by-category/project/pio-build.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/filter-by-category/project/pio-build.sbt b/examples/scala-parallel-recommendation/filter-by-category/project/pio-build.sbt
deleted file mode 100644
index 9aed0ee..0000000
--- a/examples/scala-parallel-recommendation/filter-by-category/project/pio-build.sbt
+++ /dev/null
@@ -1 +0,0 @@
-addSbtPlugin("org.apache.predictionio" % "pio-build" % "0.9.0")

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/ALSAlgorithm.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/ALSAlgorithm.scala
deleted file mode 100644
index 0046e69..0000000
--- a/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/ALSAlgorithm.scala
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.recommendation
-
-import org.apache.predictionio.controller.PAlgorithm
-import org.apache.predictionio.controller.Params
-import org.apache.predictionio.data.storage.BiMap
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.mllib.recommendation.ALS
-import org.apache.spark.mllib.recommendation.{Rating => MLlibRating}
-import org.apache.spark.mllib.recommendation.ALSModel
-
-import grizzled.slf4j.Logger
-
-case class ALSAlgorithmParams(
-  rank: Int,
-  numIterations: Int,
-  lambda: Double,
-  seed: Option[Long]) extends Params
-
-class ALSAlgorithm(val ap: ALSAlgorithmParams)
-  extends PAlgorithm[PreparedData, ALSModel, Query, PredictedResult] {
-
-  @transient lazy val logger = Logger[this.type]
-
-  def train(sc: SparkContext, data: PreparedData): ALSModel = {
-    // MLLib ALS cannot handle empty training data.
-    require(data.ratings.take(1).nonEmpty,
-      s"RDD[Rating] in PreparedData cannot be empty." +
-      " Please check if DataSource generates TrainingData" +
-      " and Preprator generates PreparedData correctly.")
-    // Convert user and item String IDs to Int index for MLlib
-    val userStringIntMap = BiMap.stringInt(data.ratings.map(_.user))
-    val itemStringIntMap = BiMap.stringInt(data.ratings.map(_.item))
-    val mllibRatings = data.ratings.map( r =>
-      // MLlibRating requires integer index for user and item
-      MLlibRating(userStringIntMap(r.user), itemStringIntMap(r.item), r.rating)
-    )
-
-    // seed for MLlib ALS
-    val seed = ap.seed.getOrElse(System.nanoTime)
-
-    // If you only have one type of implicit event (Eg. "view" event only),
-    // replace ALS.train(...) with
-    //val m = ALS.trainImplicit(
-      //ratings = mllibRatings,
-      //rank = ap.rank,
-      //iterations = ap.numIterations,
-      //lambda = ap.lambda,
-      //blocks = -1,
-      //alpha = 1.0,
-      //seed = seed)
-
-    val m = ALS.train(
-      ratings = mllibRatings,
-      rank = ap.rank,
-      iterations = ap.numIterations,
-      lambda = ap.lambda,
-      blocks = -1,
-      seed = seed)
-
-    val categories = data.items.flatMap(_.categories).distinct().collect().toSet
-
-    val categoriesMap = categories.map { category =>
-      category -> data.items
-        .filter(_.categories.contains(category))
-        .map(item => itemStringIntMap(item.id))
-        .collect()
-        .toSet
-    }.toMap
-
-    new ALSModel(
-      rank = m.rank,
-      userFeatures = m.userFeatures,
-      productFeatures = m.productFeatures,
-      userStringIntMap = userStringIntMap,
-      itemStringIntMap = itemStringIntMap,
-      categoryItemsMap = categoriesMap)
-  }
-
-  def predict(model: ALSModel, query: Query): PredictedResult = {
-    // Convert String ID to Int index for Mllib
-    model.userStringIntMap.get(query.user).map { userInt =>
-      // create inverse view of itemStringIntMap
-      val itemIntStringMap = model.itemStringIntMap.inverse
-      // Find items on query category
-      val categoriesItems = query.categories.map { category =>
-        model.categoryItemsMap.getOrElse(category, Set.empty)
-      }
-      // recommendProductsFromCategory() returns Array[MLlibRating], which uses item Int
-      // index. Convert it to String ID for returning PredictedResult
-      val itemScores = model.recommendProductsFromCategory(userInt, query.num, categoriesItems)
-        .map (r => ItemScore(itemIntStringMap(r.product), r.rating))
-      new PredictedResult(itemScores)
-    }.getOrElse{
-      logger.info(s"No prediction for unknown user ${query.user}.")
-      new PredictedResult(Array.empty)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/ALSModel.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/ALSModel.scala b/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/ALSModel.scala
deleted file mode 100644
index 1816dc9..0000000
--- a/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/ALSModel.scala
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.mllib.recommendation
-// This must be the same package as Spark's MatrixFactorizationModel because
-// MatrixFactorizationModel's constructor is private and we are using
-// its constructor in order to save and load the model
-
-import org.jblas.DoubleMatrix
-import org.template.recommendation.ALSAlgorithmParams
-
-import org.apache.predictionio.controller.IPersistentModel
-import org.apache.predictionio.controller.IPersistentModelLoader
-import org.apache.predictionio.data.storage.BiMap
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-class ALSModel(
-    override val rank: Int,
-    override val userFeatures: RDD[(Int, Array[Double])],
-    override val productFeatures: RDD[(Int, Array[Double])],
-    val userStringIntMap: BiMap[String, Int],
-    val itemStringIntMap: BiMap[String, Int],
-    val categoryItemsMap: Map[String, Set[Int]])
-  extends MatrixFactorizationModel(rank, userFeatures, productFeatures)
-  with IPersistentModel[ALSAlgorithmParams] {
-
-  def recommendProductsFromCategory(user: Int, num: Int, categoryItems: Array[Set[Int]]) = {
-    val filteredProductFeatures = productFeatures
-      .filter { case (id, _) => categoryItems.exists(_.contains(id)) }
-    recommend(userFeatures.lookup(user).head, filteredProductFeatures, num)
-      .map(t => Rating(user, t._1, t._2))
-  }
-
-  private def recommend(
-      recommendToFeatures: Array[Double],
-      recommendableFeatures: RDD[(Int, Array[Double])],
-      num: Int): Array[(Int, Double)] = {
-    val recommendToVector = new DoubleMatrix(recommendToFeatures)
-    val scored = recommendableFeatures.map { case (id,features) =>
-      (id, recommendToVector.dot(new DoubleMatrix(features)))
-    }
-    scored.top(num)(Ordering.by(_._2))
-  }
-
-  def save(id: String, params: ALSAlgorithmParams,
-    sc: SparkContext): Boolean = {
-
-    sc.parallelize(Seq(rank)).saveAsObjectFile(s"/tmp/${id}/rank")
-    userFeatures.saveAsObjectFile(s"/tmp/${id}/userFeatures")
-    productFeatures.saveAsObjectFile(s"/tmp/${id}/productFeatures")
-    sc.parallelize(Seq(userStringIntMap)).saveAsObjectFile(s"/tmp/${id}/userStringIntMap")
-    sc.parallelize(Seq(itemStringIntMap)).saveAsObjectFile(s"/tmp/${id}/itemStringIntMap")
-    sc.parallelize(Seq(categoryItemsMap)).saveAsObjectFile(s"/tmp/${id}/categoryItemsMap")
-    true
-  }
-
-  override def toString = {
-    s"userFeatures: [${userFeatures.count()}]" +
-    s"(${userFeatures.take(2).toList}...)" +
-    s" productFeatures: [${productFeatures.count()}]" +
-    s"(${productFeatures.take(2).toList}...)" +
-    s" userStringIntMap: [${userStringIntMap.size}]" +
-    s"(${userStringIntMap.take(2)}...)" +
-    s" itemStringIntMap: [${itemStringIntMap.size}]" +
-    s"(${itemStringIntMap.take(2)}...)"
-  }
-}
-
-object ALSModel
-  extends IPersistentModelLoader[ALSAlgorithmParams, ALSModel] {
-  def apply(id: String, params: ALSAlgorithmParams,
-    sc: Option[SparkContext]) = {
-    new ALSModel(
-      rank = sc.get.objectFile[Int](s"/tmp/${id}/rank").first,
-      userFeatures = sc.get.objectFile(s"/tmp/${id}/userFeatures"),
-      productFeatures = sc.get.objectFile(s"/tmp/${id}/productFeatures"),
-      userStringIntMap = sc.get.objectFile[BiMap[String, Int]](s"/tmp/${id}/userStringIntMap").first,
-      itemStringIntMap = sc.get.objectFile[BiMap[String, Int]](s"/tmp/${id}/itemStringIntMap").first,
-      categoryItemsMap = sc.get.objectFile[Map[String, Set[Int]]](s"/tmp/${id}/categoryItemsMap").first)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/DataSource.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/DataSource.scala b/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/DataSource.scala
deleted file mode 100644
index 769524b..0000000
--- a/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/DataSource.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.recommendation
-
-import org.apache.predictionio.controller.PDataSource
-import org.apache.predictionio.controller.EmptyEvaluationInfo
-import org.apache.predictionio.controller.EmptyActualResult
-import org.apache.predictionio.controller.Params
-import org.apache.predictionio.data.storage.Event
-import org.apache.predictionio.data.storage.Storage
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-import grizzled.slf4j.Logger
-
-case class DataSourceParams(appId: Int) extends Params
-
-class DataSource(val dsp: DataSourceParams)
-  extends PDataSource[TrainingData,
-      EmptyEvaluationInfo, Query, EmptyActualResult] {
-
-  @transient lazy val logger = Logger[this.type]
-
-  override
-  def readTraining(sc: SparkContext): TrainingData = {
-    val eventsDb = Storage.getPEvents()
-
-    val itemsRDD: RDD[Item] = eventsDb.aggregateProperties(
-      appId = dsp.appId,
-      entityType = "item"
-    )(sc).map {
-      case (entityId, properties) =>
-        try {
-          Item(id = entityId, categories = properties.get[List[String]]("categories"))
-        } catch {
-          case e: Exception =>
-            logger.error(s"Failed to get properties ${properties} of" +
-              s" item ${entityId}. Exception: ${e}.")
-            throw e
-        }
-    }
-
-    val rateEventsRDD: RDD[Event] = eventsDb.find(
-      appId = dsp.appId,
-      entityType = Some("user"),
-      eventNames = Some(List("rate", "buy")), // read "rate" and "buy" event
-      // targetEntityType is optional field of an event.
-      targetEntityType = Some(Some("item")))(sc)
-
-    val ratingsRDD: RDD[Rating] = rateEventsRDD.map { event =>
-      val rating = try {
-        val ratingValue: Double = event.event match {
-          case "rate" => event.properties.get[Double]("rating")
-          case "buy" => 4.0 // map buy event to rating value of 4
-          case _ => throw new Exception(s"Unexpected event ${event} is read.")
-        }
-        // entityId and targetEntityId is String
-        Rating(event.entityId,
-          event.targetEntityId.get,
-          ratingValue)
-      } catch {
-        case e: Exception => {
-          logger.error(s"Cannot convert ${event} to Rating. Exception: ${e}.")
-          throw e
-        }
-      }
-      rating
-    }.cache()
-
-    new TrainingData(itemsRDD, ratingsRDD)
-  }
-}
-
-case class Item(
-  id: String,
-  categories: List[String]
-)
-
-case class Rating(
-  user: String,
-  item: String,
-  rating: Double
-)
-
-case class TrainingData(
-  items: RDD[Item],
-  ratings: RDD[Rating]
-) {
-  override def toString = {
-    s"items: [${items.count()}] (${items.take(2).toList}...)" +
-    s" ratings: [${ratings.count()}] (${ratings.take(2).toList}...)"
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/Engine.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/Engine.scala b/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/Engine.scala
deleted file mode 100644
index daec4e1..0000000
--- a/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/Engine.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.recommendation
-
-import org.apache.predictionio.controller.IEngineFactory
-import org.apache.predictionio.controller.Engine
-
-case class Query(
-  user: String,
-  num: Int,
-  categories: Array[String]
-)
-
-case class PredictedResult(
-  itemScores: Array[ItemScore]
-)
-
-case class ItemScore(
-  item: String,
-  score: Double
-)
-
-object RecommendationEngine extends IEngineFactory {
-  def apply() = {
-    new Engine(
-      classOf[DataSource],
-      classOf[Preparator],
-      Map("als" -> classOf[ALSAlgorithm]),
-      classOf[Serving])
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/Preparator.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/Preparator.scala b/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/Preparator.scala
deleted file mode 100644
index 991f8ab..0000000
--- a/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/Preparator.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.recommendation
-
-import org.apache.predictionio.controller.PPreparator
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-class Preparator
-  extends PPreparator[TrainingData, PreparedData] {
-
-  def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = {
-    new PreparedData(items = trainingData.items, ratings = trainingData.ratings)
-  }
-}
-
-class PreparedData(
-  val items: RDD[Item],
-  val ratings: RDD[Rating]
-) extends Serializable
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/Serving.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/Serving.scala b/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/Serving.scala
deleted file mode 100644
index 01d4a6d..0000000
--- a/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/Serving.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.recommendation
-
-import org.apache.predictionio.controller.LServing
-
-class Serving
-  extends LServing[Query, PredictedResult] {
-
-  override
-  def serve(query: Query,
-    predictedResults: Seq[PredictedResult]): PredictedResult = {
-    predictedResults.head
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/reading-custom-events/.gitignore
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/reading-custom-events/.gitignore b/examples/scala-parallel-recommendation/reading-custom-events/.gitignore
new file mode 100644
index 0000000..3f3403a
--- /dev/null
+++ b/examples/scala-parallel-recommendation/reading-custom-events/.gitignore
@@ -0,0 +1,5 @@
+data/sample_movielens_data.txt
+manifest.json
+target/
+pio.log
+/pio.sbt
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/reading-custom-events/build.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/reading-custom-events/build.sbt b/examples/scala-parallel-recommendation/reading-custom-events/build.sbt
new file mode 100644
index 0000000..3d25df1
--- /dev/null
+++ b/examples/scala-parallel-recommendation/reading-custom-events/build.sbt
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+name := "template-scala-parallel-recommendation"
+
+organization := "org.apache.predictionio"
+scalaVersion := "2.11.8"
+libraryDependencies ++= Seq(
+  "org.apache.predictionio" %% "apache-predictionio-core" % "0.11.0-incubating" % "provided",
+  "org.apache.spark"        %% "spark-mllib"              % "2.1.1" % "provided")
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/reading-custom-events/data/import_eventserver.py
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/reading-custom-events/data/import_eventserver.py b/examples/scala-parallel-recommendation/reading-custom-events/data/import_eventserver.py
new file mode 100644
index 0000000..7d88568
--- /dev/null
+++ b/examples/scala-parallel-recommendation/reading-custom-events/data/import_eventserver.py
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Import sample data for recommendation engine
+"""
+
+import predictionio
+import argparse
+import random
+
+RATE_ACTIONS_DELIMITER = "::"
+SEED = 3
+
+def import_events(client, file):
+  f = open(file, 'r')
+  random.seed(SEED)
+  count = 0
+  print("Importing data...")
+  for line in f:
+    data = line.rstrip('\r\n').split(RATE_ACTIONS_DELIMITER)
+    # For demonstration purpose, randomly mix in some dislike events
+    if (random.randint(0, 1) == 1):
+      client.create_event(
+        event="like",
+        entity_type="customer",
+        entity_id=data[0],
+        target_entity_type="product",
+        target_entity_id=data[1]
+      )
+    else:
+      client.create_event(
+        event="dislike",
+        entity_type="customer",
+        entity_id=data[0],
+        target_entity_type="product",
+        target_entity_id=data[1]
+      )
+    count += 1
+  f.close()
+  print("%s events are imported." % count)
+
+if __name__ == '__main__':
+  parser = argparse.ArgumentParser(
+    description="Import sample data for recommendation engine")
+  parser.add_argument('--access_key', default='invald_access_key')
+  parser.add_argument('--url', default="http://localhost:7070")
+  parser.add_argument('--file', default="./data/sample_movielens_data.txt")
+
+  args = parser.parse_args()
+  print(args)
+
+  client = predictionio.EventClient(
+    access_key=args.access_key,
+    url=args.url,
+    threads=5,
+    qsize=500)
+  import_events(client, args.file)

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/reading-custom-events/data/send_query.py
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/reading-custom-events/data/send_query.py b/examples/scala-parallel-recommendation/reading-custom-events/data/send_query.py
new file mode 100644
index 0000000..f6ec9ab
--- /dev/null
+++ b/examples/scala-parallel-recommendation/reading-custom-events/data/send_query.py
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Send sample query to prediction engine
+"""
+
+import predictionio
+engine_client = predictionio.EngineClient(url="http://localhost:8000")
+print(engine_client.send_query({"user": "1", "num": 4}))

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/reading-custom-events/engine.json
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/reading-custom-events/engine.json b/examples/scala-parallel-recommendation/reading-custom-events/engine.json
new file mode 100644
index 0000000..718b0e1
--- /dev/null
+++ b/examples/scala-parallel-recommendation/reading-custom-events/engine.json
@@ -0,0 +1,21 @@
+{
+  "id": "default",
+  "description": "Default settings",
+  "engineFactory": "org.apache.predictionio.examples.recommendation.RecommendationEngine",
+  "datasource": {
+    "params" : {
+      "appName": "MyApp1"
+    }
+  },
+  "algorithms": [
+    {
+      "name": "als",
+      "params": {
+        "rank": 10,
+        "numIterations": 20,
+        "lambda": 0.01,
+        "seed": 3
+      }
+    }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/reading-custom-events/project/assembly.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/reading-custom-events/project/assembly.sbt b/examples/scala-parallel-recommendation/reading-custom-events/project/assembly.sbt
new file mode 100644
index 0000000..92636bc
--- /dev/null
+++ b/examples/scala-parallel-recommendation/reading-custom-events/project/assembly.sbt
@@ -0,0 +1 @@
+addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.4")
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/reading-custom-events/project/build.properties
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/reading-custom-events/project/build.properties b/examples/scala-parallel-recommendation/reading-custom-events/project/build.properties
new file mode 100644
index 0000000..64317fd
--- /dev/null
+++ b/examples/scala-parallel-recommendation/reading-custom-events/project/build.properties
@@ -0,0 +1 @@
+sbt.version=0.13.15

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/ALSAlgorithm.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/ALSAlgorithm.scala
new file mode 100644
index 0000000..b0f874d
--- /dev/null
+++ b/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/ALSAlgorithm.scala
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.recommendation
+
+import org.apache.predictionio.controller.PAlgorithm
+import org.apache.predictionio.controller.Params
+import org.apache.predictionio.data.storage.BiMap
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.recommendation.ALS
+import org.apache.spark.mllib.recommendation.{Rating => MLlibRating}
+import org.apache.spark.mllib.recommendation.ALSModel
+
+import grizzled.slf4j.Logger
+
+case class ALSAlgorithmParams(
+  rank: Int,
+  numIterations: Int,
+  lambda: Double,
+  seed: Option[Long]) extends Params
+
+class ALSAlgorithm(val ap: ALSAlgorithmParams)
+  extends PAlgorithm[PreparedData, ALSModel, Query, PredictedResult] {
+
+  @transient lazy val logger = Logger[this.type]
+
+  if (ap.numIterations > 30) {
+    logger.warn(
+      s"ALSAlgorithmParams.numIterations > 30, current: ${ap.numIterations}. " +
+      s"There is a chance of running to StackOverflowException." +
+      s"To remedy it, set lower numIterations or checkpoint parameters.")
+  }
+
+  def train(sc: SparkContext, data: PreparedData): ALSModel = {
+    // MLLib ALS cannot handle empty training data.
+    require(!data.ratings.take(1).isEmpty,
+      s"RDD[Rating] in PreparedData cannot be empty." +
+      " Please check if DataSource generates TrainingData" +
+      " and Preparator generates PreparedData correctly.")
+    // Convert user and item String IDs to Int index for MLlib
+
+    val userStringIntMap = BiMap.stringInt(data.ratings.map(_.user))
+    val itemStringIntMap = BiMap.stringInt(data.ratings.map(_.item))
+    val mllibRatings = data.ratings.map( r =>
+      // MLlibRating requires integer index for user and item
+      MLlibRating(userStringIntMap(r.user), itemStringIntMap(r.item), r.rating)
+    )
+
+    // seed for MLlib ALS
+    val seed = ap.seed.getOrElse(System.nanoTime)
+
+    // Set checkpoint directory
+    // sc.setCheckpointDir("checkpoint")
+
+    // If you only have one type of implicit event (Eg. "view" event only),
+    // set implicitPrefs to true
+    val implicitPrefs = false
+    val als = new ALS()
+    als.setUserBlocks(-1)
+    als.setProductBlocks(-1)
+    als.setRank(ap.rank)
+    als.setIterations(ap.numIterations)
+    als.setLambda(ap.lambda)
+    als.setImplicitPrefs(implicitPrefs)
+    als.setAlpha(1.0)
+    als.setSeed(seed)
+    als.setCheckpointInterval(10)
+    val m = als.run(mllibRatings)
+
+    new ALSModel(
+      rank = m.rank,
+      userFeatures = m.userFeatures,
+      productFeatures = m.productFeatures,
+      userStringIntMap = userStringIntMap,
+      itemStringIntMap = itemStringIntMap)
+  }
+
+  def predict(model: ALSModel, query: Query): PredictedResult = {
+    // Convert String ID to Int index for Mllib
+    model.userStringIntMap.get(query.user).map { userInt =>
+      // create inverse view of itemStringIntMap
+      val itemIntStringMap = model.itemStringIntMap.inverse
+      // recommendProducts() returns Array[MLlibRating], which uses item Int
+      // index. Convert it to String ID for returning PredictedResult
+      val itemScores = model.recommendProducts(userInt, query.num)
+        .map (r => ItemScore(itemIntStringMap(r.product), r.rating))
+      new PredictedResult(itemScores)
+    }.getOrElse{
+      logger.info(s"No prediction for unknown user ${query.user}.")
+      new PredictedResult(Array.empty)
+    }
+  }
+
+  // This function is used by the evaluation module, where a batch of queries is sent to this engine
+  // for evaluation purpose.
+  override def batchPredict(model: ALSModel, queries: RDD[(Long, Query)]): RDD[(Long, PredictedResult)] = {
+    val userIxQueries: RDD[(Int, (Long, Query))] = queries
+    .map { case (ix, query) => {
+      // If user not found, then the index is -1
+      val userIx = model.userStringIntMap.get(query.user).getOrElse(-1)
+      (userIx, (ix, query))
+    }}
+
+    // Cross product of all valid users from the queries and products in the model.
+    val usersProducts: RDD[(Int, Int)] = userIxQueries
+      .keys
+      .filter(_ != -1)
+      .cartesian(model.productFeatures.map(_._1))
+
+    // Call mllib ALS's predict function.
+    val ratings: RDD[MLlibRating] = model.predict(usersProducts)
+
+    // The following code construct predicted results from mllib's ratings.
+    // Not optimal implementation. Instead of groupBy, should use combineByKey with a PriorityQueue
+    val userRatings: RDD[(Int, Iterable[MLlibRating])] = ratings.groupBy(_.user)
+
+    userIxQueries.leftOuterJoin(userRatings)
+    .map {
+      // When there are ratings
+      case (userIx, ((ix, query), Some(ratings))) => {
+        val topItemScores: Array[ItemScore] = ratings
+        .toArray
+        .sortBy(_.rating)(Ordering.Double.reverse) // note: from large to small ordering
+        .take(query.num)
+        .map { rating => ItemScore(
+          model.itemStringIntMap.inverse(rating.product),
+          rating.rating) }
+
+        (ix, PredictedResult(itemScores = topItemScores))
+      }
+      // When user doesn't exist in training data
+      case (userIx, ((ix, query), None)) => {
+        require(userIx == -1)
+        (ix, PredictedResult(itemScores = Array.empty))
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/ALSModel.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/ALSModel.scala b/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/ALSModel.scala
new file mode 100644
index 0000000..898858d
--- /dev/null
+++ b/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/ALSModel.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.recommendation
+// This must be the same package as Spark's MatrixFactorizationModel because
+// MatrixFactorizationModel's constructor is private and we are using
+// its constructor in order to save and load the model
+
+import org.apache.predictionio.examples.recommendation.ALSAlgorithmParams
+
+import org.apache.predictionio.controller.PersistentModel
+import org.apache.predictionio.controller.PersistentModelLoader
+import org.apache.predictionio.data.storage.BiMap
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+class ALSModel(
+    override val rank: Int,
+    override val userFeatures: RDD[(Int, Array[Double])],
+    override val productFeatures: RDD[(Int, Array[Double])],
+    val userStringIntMap: BiMap[String, Int],
+    val itemStringIntMap: BiMap[String, Int])
+  extends MatrixFactorizationModel(rank, userFeatures, productFeatures)
+  with PersistentModel[ALSAlgorithmParams] {
+
+  def save(id: String, params: ALSAlgorithmParams,
+    sc: SparkContext): Boolean = {
+
+    sc.parallelize(Seq(rank)).saveAsObjectFile(s"/tmp/${id}/rank")
+    userFeatures.saveAsObjectFile(s"/tmp/${id}/userFeatures")
+    productFeatures.saveAsObjectFile(s"/tmp/${id}/productFeatures")
+    sc.parallelize(Seq(userStringIntMap))
+      .saveAsObjectFile(s"/tmp/${id}/userStringIntMap")
+    sc.parallelize(Seq(itemStringIntMap))
+      .saveAsObjectFile(s"/tmp/${id}/itemStringIntMap")
+    true
+  }
+
+  override def toString = {
+    s"userFeatures: [${userFeatures.count()}]" +
+    s"(${userFeatures.take(2).toList}...)" +
+    s" productFeatures: [${productFeatures.count()}]" +
+    s"(${productFeatures.take(2).toList}...)" +
+    s" userStringIntMap: [${userStringIntMap.size}]" +
+    s"(${userStringIntMap.take(2)}...)" +
+    s" itemStringIntMap: [${itemStringIntMap.size}]" +
+    s"(${itemStringIntMap.take(2)}...)"
+  }
+}
+
+object ALSModel
+  extends PersistentModelLoader[ALSAlgorithmParams, ALSModel] {
+  def apply(id: String, params: ALSAlgorithmParams,
+    sc: Option[SparkContext]) = {
+    new ALSModel(
+      rank = sc.get.objectFile[Int](s"/tmp/${id}/rank").first,
+      userFeatures = sc.get.objectFile(s"/tmp/${id}/userFeatures"),
+      productFeatures = sc.get.objectFile(s"/tmp/${id}/productFeatures"),
+      userStringIntMap = sc.get
+        .objectFile[BiMap[String, Int]](s"/tmp/${id}/userStringIntMap").first,
+      itemStringIntMap = sc.get
+        .objectFile[BiMap[String, Int]](s"/tmp/${id}/itemStringIntMap").first)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/DataSource.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/DataSource.scala b/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/DataSource.scala
new file mode 100644
index 0000000..cf9738d
--- /dev/null
+++ b/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/DataSource.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.recommendation
+
+import org.apache.predictionio.controller.PDataSource
+import org.apache.predictionio.controller.EmptyEvaluationInfo
+import org.apache.predictionio.controller.EmptyActualResult
+import org.apache.predictionio.controller.Params
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.store.PEventStore
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+import grizzled.slf4j.Logger
+
+case class DataSourceEvalParams(kFold: Int, queryNum: Int)
+
+case class DataSourceParams(
+  appName: String,
+  evalParams: Option[DataSourceEvalParams]) extends Params
+
+class DataSource(val dsp: DataSourceParams)
+  extends PDataSource[TrainingData,
+      EmptyEvaluationInfo, Query, ActualResult] {
+
+  @transient lazy val logger = Logger[this.type]
+
+  def getRatings(sc: SparkContext): RDD[Rating] = {
+
+    val eventsRDD: RDD[Event] = PEventStore.find(
+      appName = dsp.appName,
+      entityType = Some("customer"), // MODIFIED
+      eventNames = Some(List("like", "dislike")), // MODIFIED
+      // targetEntityType is optional field of an event.
+      targetEntityType = Some(Some("product")))(sc) // MODIFIED
+
+    val ratingsRDD: RDD[Rating] = eventsRDD.map { event =>
+      val rating = try {
+        val ratingValue: Double = event.event match {
+          // MODIFIED
+          case "like" => 4.0 // map a like event to a rating of 4.0
+          case "dislike" => 1.0  // map a like event to a rating of 1.0
+          case _ => throw new Exception(s"Unexpected event ${event} is read.")
+        }
+        // entityId and targetEntityId is String
+        Rating(event.entityId,
+          event.targetEntityId.get,
+          ratingValue)
+      } catch {
+        case e: Exception => {
+          logger.error(s"Cannot convert ${event} to Rating. Exception: ${e}.")
+          throw e
+        }
+      }
+      rating
+    }.cache()
+
+    ratingsRDD
+  }
+
+  override
+  def readTraining(sc: SparkContext): TrainingData = {
+    new TrainingData(getRatings(sc))
+  }
+
+  override
+  def readEval(sc: SparkContext)
+  : Seq[(TrainingData, EmptyEvaluationInfo, RDD[(Query, ActualResult)])] = {
+    require(!dsp.evalParams.isEmpty, "Must specify evalParams")
+    val evalParams = dsp.evalParams.get
+
+    val kFold = evalParams.kFold
+    val ratings: RDD[(Rating, Long)] = getRatings(sc).zipWithUniqueId
+    ratings.cache
+
+    (0 until kFold).map { idx => {
+      val trainingRatings = ratings.filter(_._2 % kFold != idx).map(_._1)
+      val testingRatings = ratings.filter(_._2 % kFold == idx).map(_._1)
+
+      val testingUsers: RDD[(String, Iterable[Rating])] = testingRatings.groupBy(_.user)
+
+      (new TrainingData(trainingRatings),
+        new EmptyEvaluationInfo(),
+        testingUsers.map {
+          case (user, ratings) => (Query(user, evalParams.queryNum), ActualResult(ratings.toArray))
+        }
+      )
+    }}
+  }
+}
+
+case class Rating(
+  user: String,
+  item: String,
+  rating: Double
+)
+
+class TrainingData(
+  val ratings: RDD[Rating]
+) extends Serializable {
+  override def toString = {
+    s"ratings: [${ratings.count()}] (${ratings.take(2).toList}...)"
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/Engine.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/Engine.scala b/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/Engine.scala
new file mode 100644
index 0000000..b2a668b
--- /dev/null
+++ b/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/Engine.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.recommendation
+
+import org.apache.predictionio.controller.EngineFactory
+import org.apache.predictionio.controller.Engine
+
+case class Query(
+  user: String,
+  num: Int
+) extends Serializable
+
+case class PredictedResult(
+  itemScores: Array[ItemScore]
+) extends Serializable
+
+case class ActualResult(
+  ratings: Array[Rating]
+) extends Serializable
+
+case class ItemScore(
+  item: String,
+  score: Double
+) extends Serializable
+
+object RecommendationEngine extends EngineFactory {
+  def apply() = {
+    new Engine(
+      classOf[DataSource],
+      classOf[Preparator],
+      Map("als" -> classOf[ALSAlgorithm]),
+      classOf[Serving])
+  }
+}