You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by ch...@apache.org on 2017/09/28 15:54:49 UTC

[08/57] [abbrv] incubator-predictionio git commit: [PIO-97] Fixes examples of the official templates for v0.11.0-incubating.

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-ecommercerecommendation/weighted-items/src/main/scala/Engine.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-ecommercerecommendation/weighted-items/src/main/scala/Engine.scala b/examples/scala-parallel-ecommercerecommendation/weighted-items/src/main/scala/Engine.scala
deleted file mode 100644
index 7ec3dd9..0000000
--- a/examples/scala-parallel-ecommercerecommendation/weighted-items/src/main/scala/Engine.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.ecommercerecommendation
-
-import org.apache.predictionio.controller.IEngineFactory
-import org.apache.predictionio.controller.Engine
-
-case class Query(
-  user: String,
-  num: Int,
-  categories: Option[Set[String]],
-  whiteList: Option[Set[String]],
-  blackList: Option[Set[String]]
-)
-
-case class PredictedResult(
-  itemScores: Array[ItemScore]
-)
-
-case class ItemScore(
-  item: String,
-  score: Double
-)
-
-object ECommerceRecommendationEngine extends IEngineFactory {
-  def apply() = {
-    new Engine(
-      classOf[DataSource],
-      classOf[Preparator],
-      Map("als" -> classOf[ALSAlgorithm]),
-      classOf[Serving])
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-ecommercerecommendation/weighted-items/src/main/scala/Preparator.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-ecommercerecommendation/weighted-items/src/main/scala/Preparator.scala b/examples/scala-parallel-ecommercerecommendation/weighted-items/src/main/scala/Preparator.scala
deleted file mode 100644
index 75040e9..0000000
--- a/examples/scala-parallel-ecommercerecommendation/weighted-items/src/main/scala/Preparator.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.ecommercerecommendation
-
-import org.apache.predictionio.controller.PPreparator
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-class Preparator
-  extends PPreparator[TrainingData, PreparedData] {
-
-  def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = {
-    new PreparedData(
-      users = trainingData.users,
-      items = trainingData.items,
-      viewEvents = trainingData.viewEvents)
-  }
-}
-
-class PreparedData(
-  val users: RDD[(String, User)],
-  val items: RDD[(String, Item)],
-  val viewEvents: RDD[ViewEvent]
-) extends Serializable

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-ecommercerecommendation/weighted-items/src/main/scala/Serving.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-ecommercerecommendation/weighted-items/src/main/scala/Serving.scala b/examples/scala-parallel-ecommercerecommendation/weighted-items/src/main/scala/Serving.scala
deleted file mode 100644
index 32fe959..0000000
--- a/examples/scala-parallel-ecommercerecommendation/weighted-items/src/main/scala/Serving.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.ecommercerecommendation
-
-import org.apache.predictionio.controller.LServing
-
-class Serving
-  extends LServing[Query, PredictedResult] {
-
-  override
-  def serve(query: Query,
-    predictedResults: Seq[PredictedResult]): PredictedResult = {
-    predictedResults.head
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-ecommercerecommendation/weighted-items/template.json
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-ecommercerecommendation/weighted-items/template.json b/examples/scala-parallel-ecommercerecommendation/weighted-items/template.json
deleted file mode 100644
index 932e603..0000000
--- a/examples/scala-parallel-ecommercerecommendation/weighted-items/template.json
+++ /dev/null
@@ -1 +0,0 @@
-{"pio": {"version": { "min": "0.9.0" }}}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/README.md
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/README.md b/examples/scala-parallel-recommendation/README.md
new file mode 100644
index 0000000..0be8090
--- /dev/null
+++ b/examples/scala-parallel-recommendation/README.md
@@ -0,0 +1,20 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+This is based on Recommendation Template v0.11.0-incubating.
+
+Please refer to http://predictionio.incubator.apache.org/templates/recommendation/how-to/
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/blacklist-items/build.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/blacklist-items/build.sbt b/examples/scala-parallel-recommendation/blacklist-items/build.sbt
new file mode 100644
index 0000000..3d25df1
--- /dev/null
+++ b/examples/scala-parallel-recommendation/blacklist-items/build.sbt
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+name := "template-scala-parallel-recommendation"
+
+organization := "org.apache.predictionio"
+scalaVersion := "2.11.8"
+libraryDependencies ++= Seq(
+  "org.apache.predictionio" %% "apache-predictionio-core" % "0.11.0-incubating" % "provided",
+  "org.apache.spark"        %% "spark-mllib"              % "2.1.1" % "provided")
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/blacklist-items/data/import_eventserver.py
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/blacklist-items/data/import_eventserver.py b/examples/scala-parallel-recommendation/blacklist-items/data/import_eventserver.py
new file mode 100644
index 0000000..63694cf
--- /dev/null
+++ b/examples/scala-parallel-recommendation/blacklist-items/data/import_eventserver.py
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Import sample data for recommendation engine
+"""
+
+import predictionio
+import argparse
+import random
+
+RATE_ACTIONS_DELIMITER = "::"
+SEED = 3
+
+def import_events(client, file):
+  f = open(file, 'r')
+  random.seed(SEED)
+  count = 0
+  print("Importing data...")
+  for line in f:
+    data = line.rstrip('\r\n').split(RATE_ACTIONS_DELIMITER)
+    # For demonstration purpose, randomly mix in some buy events
+    if (random.randint(0, 1) == 1):
+      client.create_event(
+        event="rate",
+        entity_type="user",
+        entity_id=data[0],
+        target_entity_type="item",
+        target_entity_id=data[1],
+        properties= { "rating" : float(data[2]) }
+      )
+    else:
+      client.create_event(
+        event="buy",
+        entity_type="user",
+        entity_id=data[0],
+        target_entity_type="item",
+        target_entity_id=data[1]
+      )
+    count += 1
+  f.close()
+  print("%s events are imported." % count)
+
+if __name__ == '__main__':
+  parser = argparse.ArgumentParser(
+    description="Import sample data for recommendation engine")
+  parser.add_argument('--access_key', default='invald_access_key')
+  parser.add_argument('--url', default="http://localhost:7070")
+  parser.add_argument('--file', default="./data/sample_movielens_data.txt")
+
+  args = parser.parse_args()
+  print(args)
+
+  client = predictionio.EventClient(
+    access_key=args.access_key,
+    url=args.url,
+    threads=5,
+    qsize=500)
+  import_events(client, args.file)

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/blacklist-items/data/send_query.py
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/blacklist-items/data/send_query.py b/examples/scala-parallel-recommendation/blacklist-items/data/send_query.py
new file mode 100644
index 0000000..f6ec9ab
--- /dev/null
+++ b/examples/scala-parallel-recommendation/blacklist-items/data/send_query.py
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Send sample query to prediction engine
+"""
+
+import predictionio
+engine_client = predictionio.EngineClient(url="http://localhost:8000")
+print(engine_client.send_query({"user": "1", "num": 4}))

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/blacklist-items/engine.json
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/blacklist-items/engine.json b/examples/scala-parallel-recommendation/blacklist-items/engine.json
new file mode 100644
index 0000000..718b0e1
--- /dev/null
+++ b/examples/scala-parallel-recommendation/blacklist-items/engine.json
@@ -0,0 +1,21 @@
+{
+  "id": "default",
+  "description": "Default settings",
+  "engineFactory": "org.apache.predictionio.examples.recommendation.RecommendationEngine",
+  "datasource": {
+    "params" : {
+      "appName": "MyApp1"
+    }
+  },
+  "algorithms": [
+    {
+      "name": "als",
+      "params": {
+        "rank": 10,
+        "numIterations": 20,
+        "lambda": 0.01,
+        "seed": 3
+      }
+    }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/blacklist-items/project/assembly.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/blacklist-items/project/assembly.sbt b/examples/scala-parallel-recommendation/blacklist-items/project/assembly.sbt
new file mode 100644
index 0000000..92636bc
--- /dev/null
+++ b/examples/scala-parallel-recommendation/blacklist-items/project/assembly.sbt
@@ -0,0 +1 @@
+addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.4")
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/blacklist-items/project/build.properties
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/blacklist-items/project/build.properties b/examples/scala-parallel-recommendation/blacklist-items/project/build.properties
new file mode 100644
index 0000000..64317fd
--- /dev/null
+++ b/examples/scala-parallel-recommendation/blacklist-items/project/build.properties
@@ -0,0 +1 @@
+sbt.version=0.13.15

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/blacklist-items/src/main/scala/ALSAlgorithm.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/blacklist-items/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-recommendation/blacklist-items/src/main/scala/ALSAlgorithm.scala
new file mode 100644
index 0000000..f613f66
--- /dev/null
+++ b/examples/scala-parallel-recommendation/blacklist-items/src/main/scala/ALSAlgorithm.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.recommendation
+
+import org.apache.predictionio.controller.PAlgorithm
+import org.apache.predictionio.controller.Params
+import org.apache.predictionio.data.storage.BiMap
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.recommendation.ALS
+import org.apache.spark.mllib.recommendation.{Rating => MLlibRating}
+import org.apache.spark.mllib.recommendation.ALSModel
+
+import grizzled.slf4j.Logger
+
+case class ALSAlgorithmParams(
+  rank: Int,
+  numIterations: Int,
+  lambda: Double,
+  seed: Option[Long]) extends Params
+
+class ALSAlgorithm(val ap: ALSAlgorithmParams)
+  extends PAlgorithm[PreparedData, ALSModel, Query, PredictedResult] {
+
+  @transient lazy val logger = Logger[this.type]
+
+  if (ap.numIterations > 30) {
+    logger.warn(
+      s"ALSAlgorithmParams.numIterations > 30, current: ${ap.numIterations}. " +
+      s"There is a chance of running to StackOverflowException." +
+      s"To remedy it, set lower numIterations or checkpoint parameters.")
+  }
+
+  def train(sc: SparkContext, data: PreparedData): ALSModel = {
+    // MLLib ALS cannot handle empty training data.
+    require(!data.ratings.take(1).isEmpty,
+      s"RDD[Rating] in PreparedData cannot be empty." +
+      " Please check if DataSource generates TrainingData" +
+      " and Preparator generates PreparedData correctly.")
+    // Convert user and item String IDs to Int index for MLlib
+
+    val userStringIntMap = BiMap.stringInt(data.ratings.map(_.user))
+    val itemStringIntMap = BiMap.stringInt(data.ratings.map(_.item))
+    val mllibRatings = data.ratings.map( r =>
+      // MLlibRating requires integer index for user and item
+      MLlibRating(userStringIntMap(r.user), itemStringIntMap(r.item), r.rating)
+    )
+
+    // seed for MLlib ALS
+    val seed = ap.seed.getOrElse(System.nanoTime)
+
+    // Set checkpoint directory
+    // sc.setCheckpointDir("checkpoint")
+
+    // If you only have one type of implicit event (Eg. "view" event only),
+    // set implicitPrefs to true
+    val implicitPrefs = false
+    val als = new ALS()
+    als.setUserBlocks(-1)
+    als.setProductBlocks(-1)
+    als.setRank(ap.rank)
+    als.setIterations(ap.numIterations)
+    als.setLambda(ap.lambda)
+    als.setImplicitPrefs(implicitPrefs)
+    als.setAlpha(1.0)
+    als.setSeed(seed)
+    als.setCheckpointInterval(10)
+    val m = als.run(mllibRatings)
+
+    new ALSModel(
+      rank = m.rank,
+      userFeatures = m.userFeatures,
+      productFeatures = m.productFeatures,
+      userStringIntMap = userStringIntMap,
+      itemStringIntMap = itemStringIntMap)
+  }
+
+  def predict(model: ALSModel, query: Query): PredictedResult = {
+    // Convert String ID to Int index for Mllib
+    model.userStringIntMap.get(query.user).map { userInt =>
+      // create inverse view of itemStringIntMap
+      val itemIntStringMap = model.itemStringIntMap.inverse
+      // recommendProductsWithFilter() returns Array[MLlibRating], which uses item Int
+      // index. Convert it to String ID for returning PredictedResult
+      val blackList = query.blackList.flatMap(model.itemStringIntMap.get) // ADDED
+      val itemScores = model
+        .recommendProductsWithFilter(userInt, query.num, blackList) // MODIFIED
+        .map (r => ItemScore(itemIntStringMap(r.product), r.rating))
+      new PredictedResult(itemScores)
+    }.getOrElse{
+      logger.info(s"No prediction for unknown user ${query.user}.")
+      new PredictedResult(Array.empty)
+    }
+  }
+
+  // This function is used by the evaluation module, where a batch of queries is sent to this engine
+  // for evaluation purpose.
+  override def batchPredict(model: ALSModel, queries: RDD[(Long, Query)]): RDD[(Long, PredictedResult)] = {
+    val userIxQueries: RDD[(Int, (Long, Query))] = queries
+    .map { case (ix, query) => {
+      // If user not found, then the index is -1
+      val userIx = model.userStringIntMap.get(query.user).getOrElse(-1)
+      (userIx, (ix, query))
+    }}
+
+    // Cross product of all valid users from the queries and products in the model.
+    val usersProducts: RDD[(Int, Int)] = userIxQueries
+      .keys
+      .filter(_ != -1)
+      .cartesian(model.productFeatures.map(_._1))
+
+    // Call mllib ALS's predict function.
+    val ratings: RDD[MLlibRating] = model.predict(usersProducts)
+
+    // The following code construct predicted results from mllib's ratings.
+    // Not optimal implementation. Instead of groupBy, should use combineByKey with a PriorityQueue
+    val userRatings: RDD[(Int, Iterable[MLlibRating])] = ratings.groupBy(_.user)
+
+    userIxQueries.leftOuterJoin(userRatings)
+    .map {
+      // When there are ratings
+      case (userIx, ((ix, query), Some(ratings))) => {
+        val topItemScores: Array[ItemScore] = ratings
+        .toArray
+        .sortBy(_.rating)(Ordering.Double.reverse) // note: from large to small ordering
+        .take(query.num)
+        .map { rating => ItemScore(
+          model.itemStringIntMap.inverse(rating.product),
+          rating.rating) }
+
+        (ix, PredictedResult(itemScores = topItemScores))
+      }
+      // When user doesn't exist in training data
+      case (userIx, ((ix, query), None)) => {
+        require(userIx == -1)
+        (ix, PredictedResult(itemScores = Array.empty))
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/blacklist-items/src/main/scala/ALSModel.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/blacklist-items/src/main/scala/ALSModel.scala b/examples/scala-parallel-recommendation/blacklist-items/src/main/scala/ALSModel.scala
new file mode 100644
index 0000000..f3c881e
--- /dev/null
+++ b/examples/scala-parallel-recommendation/blacklist-items/src/main/scala/ALSModel.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.recommendation
+// This must be the same package as Spark's MatrixFactorizationModel because
+// MatrixFactorizationModel's constructor is private and we are using
+// its constructor in order to save and load the model
+
+import com.github.fommil.netlib.BLAS.{getInstance => blas} // ADDED
+import org.apache.predictionio.examples.recommendation.ALSAlgorithmParams
+
+import org.apache.predictionio.controller.PersistentModel
+import org.apache.predictionio.controller.PersistentModelLoader
+import org.apache.predictionio.data.storage.BiMap
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+class ALSModel(
+    override val rank: Int,
+    override val userFeatures: RDD[(Int, Array[Double])],
+    override val productFeatures: RDD[(Int, Array[Double])],
+    val userStringIntMap: BiMap[String, Int],
+    val itemStringIntMap: BiMap[String, Int])
+  extends MatrixFactorizationModel(rank, userFeatures, productFeatures)
+  with PersistentModel[ALSAlgorithmParams] {
+
+  // ADDED
+  def recommendProductsWithFilter(user: Int, num: Int, productIdFilter: Set[Int]) = {
+    val filteredProductFeatures = productFeatures
+      .filter { case (id, _) => !productIdFilter.contains(id) } // (*)
+    recommend(userFeatures.lookup(user).head, filteredProductFeatures, num)
+      .map(t => Rating(user, t._1, t._2))
+  }
+
+  // ADDED
+  private def recommend(
+      recommendToFeatures: Array[Double],
+      recommendableFeatures: RDD[(Int, Array[Double])],
+      num: Int): Array[(Int, Double)] = {
+    val scored = recommendableFeatures.map { case (id, features) =>
+      (id, blas.ddot(features.length, recommendToFeatures, 1, features, 1))
+    }
+    scored.top(num)(Ordering.by(_._2))
+  }
+
+  def save(id: String, params: ALSAlgorithmParams,
+    sc: SparkContext): Boolean = {
+
+    sc.parallelize(Seq(rank)).saveAsObjectFile(s"/tmp/${id}/rank")
+    userFeatures.saveAsObjectFile(s"/tmp/${id}/userFeatures")
+    productFeatures.saveAsObjectFile(s"/tmp/${id}/productFeatures")
+    sc.parallelize(Seq(userStringIntMap))
+      .saveAsObjectFile(s"/tmp/${id}/userStringIntMap")
+    sc.parallelize(Seq(itemStringIntMap))
+      .saveAsObjectFile(s"/tmp/${id}/itemStringIntMap")
+    true
+  }
+
+  override def toString = {
+    s"userFeatures: [${userFeatures.count()}]" +
+    s"(${userFeatures.take(2).toList}...)" +
+    s" productFeatures: [${productFeatures.count()}]" +
+    s"(${productFeatures.take(2).toList}...)" +
+    s" userStringIntMap: [${userStringIntMap.size}]" +
+    s"(${userStringIntMap.take(2)}...)" +
+    s" itemStringIntMap: [${itemStringIntMap.size}]" +
+    s"(${itemStringIntMap.take(2)}...)"
+  }
+}
+
+object ALSModel
+  extends PersistentModelLoader[ALSAlgorithmParams, ALSModel] {
+  def apply(id: String, params: ALSAlgorithmParams,
+    sc: Option[SparkContext]) = {
+    new ALSModel(
+      rank = sc.get.objectFile[Int](s"/tmp/${id}/rank").first,
+      userFeatures = sc.get.objectFile(s"/tmp/${id}/userFeatures"),
+      productFeatures = sc.get.objectFile(s"/tmp/${id}/productFeatures"),
+      userStringIntMap = sc.get
+        .objectFile[BiMap[String, Int]](s"/tmp/${id}/userStringIntMap").first,
+      itemStringIntMap = sc.get
+        .objectFile[BiMap[String, Int]](s"/tmp/${id}/itemStringIntMap").first)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/blacklist-items/src/main/scala/DataSource.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/blacklist-items/src/main/scala/DataSource.scala b/examples/scala-parallel-recommendation/blacklist-items/src/main/scala/DataSource.scala
new file mode 100644
index 0000000..d5f71bd
--- /dev/null
+++ b/examples/scala-parallel-recommendation/blacklist-items/src/main/scala/DataSource.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.recommendation
+
+import org.apache.predictionio.controller.PDataSource
+import org.apache.predictionio.controller.EmptyEvaluationInfo
+import org.apache.predictionio.controller.EmptyActualResult
+import org.apache.predictionio.controller.Params
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.store.PEventStore
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+import grizzled.slf4j.Logger
+
+case class DataSourceEvalParams(kFold: Int, queryNum: Int)
+
+case class DataSourceParams(
+  appName: String,
+  evalParams: Option[DataSourceEvalParams]) extends Params
+
+class DataSource(val dsp: DataSourceParams)
+  extends PDataSource[TrainingData,
+      EmptyEvaluationInfo, Query, ActualResult] {
+
+  @transient lazy val logger = Logger[this.type]
+
+  def getRatings(sc: SparkContext): RDD[Rating] = {
+
+    val eventsRDD: RDD[Event] = PEventStore.find(
+      appName = dsp.appName,
+      entityType = Some("user"),
+      eventNames = Some(List("rate", "buy")), // read "rate" and "buy" event
+      // targetEntityType is optional field of an event.
+      targetEntityType = Some(Some("item")))(sc)
+
+    val ratingsRDD: RDD[Rating] = eventsRDD.map { event =>
+      val rating = try {
+        val ratingValue: Double = event.event match {
+          case "rate" => event.properties.get[Double]("rating")
+          case "buy" => 4.0 // map buy event to rating value of 4
+          case _ => throw new Exception(s"Unexpected event ${event} is read.")
+        }
+        // entityId and targetEntityId is String
+        Rating(event.entityId,
+          event.targetEntityId.get,
+          ratingValue)
+      } catch {
+        case e: Exception => {
+          logger.error(s"Cannot convert ${event} to Rating. Exception: ${e}.")
+          throw e
+        }
+      }
+      rating
+    }.cache()
+
+    ratingsRDD
+  }
+
+  override
+  def readTraining(sc: SparkContext): TrainingData = {
+    new TrainingData(getRatings(sc))
+  }
+
+  override
+  def readEval(sc: SparkContext)
+  : Seq[(TrainingData, EmptyEvaluationInfo, RDD[(Query, ActualResult)])] = {
+    require(!dsp.evalParams.isEmpty, "Must specify evalParams")
+    val evalParams = dsp.evalParams.get
+
+    val kFold = evalParams.kFold
+    val ratings: RDD[(Rating, Long)] = getRatings(sc).zipWithUniqueId
+    ratings.cache
+
+    (0 until kFold).map { idx => {
+      val trainingRatings = ratings.filter(_._2 % kFold != idx).map(_._1)
+      val testingRatings = ratings.filter(_._2 % kFold == idx).map(_._1)
+
+      val testingUsers: RDD[(String, Iterable[Rating])] = testingRatings.groupBy(_.user)
+
+      (new TrainingData(trainingRatings),
+        new EmptyEvaluationInfo(),
+        testingUsers.map {
+          case (user, ratings) => (Query(user, evalParams.queryNum, Set.empty), ActualResult(ratings.toArray))
+        }
+      )
+    }}
+  }
+}
+
+case class Rating(
+  user: String,
+  item: String,
+  rating: Double
+)
+
+class TrainingData(
+  val ratings: RDD[Rating]
+) extends Serializable {
+  override def toString = {
+    s"ratings: [${ratings.count()}] (${ratings.take(2).toList}...)"
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/blacklist-items/src/main/scala/Engine.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/blacklist-items/src/main/scala/Engine.scala b/examples/scala-parallel-recommendation/blacklist-items/src/main/scala/Engine.scala
new file mode 100644
index 0000000..db8351a
--- /dev/null
+++ b/examples/scala-parallel-recommendation/blacklist-items/src/main/scala/Engine.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.recommendation
+
+import org.apache.predictionio.controller.EngineFactory
+import org.apache.predictionio.controller.Engine
+
+case class Query(
+  user: String,
+  num: Int,
+  blackList: Set[String] // ADDED
+) extends Serializable
+
+case class PredictedResult(
+  itemScores: Array[ItemScore]
+) extends Serializable
+
+case class ActualResult(
+  ratings: Array[Rating]
+) extends Serializable
+
+case class ItemScore(
+  item: String,
+  score: Double
+) extends Serializable
+
+object RecommendationEngine extends EngineFactory {
+  def apply() = {
+    new Engine(
+      classOf[DataSource],
+      classOf[Preparator],
+      Map("als" -> classOf[ALSAlgorithm]),
+      classOf[Serving])
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/blacklist-items/src/main/scala/Evaluation.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/blacklist-items/src/main/scala/Evaluation.scala b/examples/scala-parallel-recommendation/blacklist-items/src/main/scala/Evaluation.scala
new file mode 100644
index 0000000..a665496
--- /dev/null
+++ b/examples/scala-parallel-recommendation/blacklist-items/src/main/scala/Evaluation.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.recommendation
+
+import org.apache.predictionio.controller.Evaluation
+import org.apache.predictionio.controller.OptionAverageMetric
+import org.apache.predictionio.controller.AverageMetric
+import org.apache.predictionio.controller.EmptyEvaluationInfo
+import org.apache.predictionio.controller.EngineParamsGenerator
+import org.apache.predictionio.controller.EngineParams
+import org.apache.predictionio.controller.MetricEvaluator
+
+// Usage:
+// $ pio eval org.example.recommendation.RecommendationEvaluation \
+//   org.example.recommendation.EngineParamsList
+
+case class PrecisionAtK(k: Int, ratingThreshold: Double = 2.0)
+    extends OptionAverageMetric[EmptyEvaluationInfo, Query, PredictedResult, ActualResult] {
+  require(k > 0, "k must be greater than 0")
+
+  override def header = s"Precision@K (k=$k, threshold=$ratingThreshold)"
+
+  def calculate(q: Query, p: PredictedResult, a: ActualResult): Option[Double] = {
+    val positives: Set[String] = a.ratings.filter(_.rating >= ratingThreshold).map(_.item).toSet
+
+    // If there is no positive results, Precision is undefined. We don't consider this case in the
+    // metrics, hence we return None.
+    if (positives.size == 0) {
+      None
+    } else {
+      val tpCount: Int = p.itemScores.take(k).filter(is => positives(is.item)).size
+      Some(tpCount.toDouble / math.min(k, positives.size))
+    }
+  }
+}
+
+case class PositiveCount(ratingThreshold: Double = 2.0)
+    extends AverageMetric[EmptyEvaluationInfo, Query, PredictedResult, ActualResult] {
+  override def header = s"PositiveCount (threshold=$ratingThreshold)"
+
+  def calculate(q: Query, p: PredictedResult, a: ActualResult): Double = {
+    a.ratings.filter(_.rating >= ratingThreshold).size
+  }
+}
+
+object RecommendationEvaluation extends Evaluation {
+  engineEvaluator = (
+    RecommendationEngine(),
+    MetricEvaluator(
+      metric = PrecisionAtK(k = 10, ratingThreshold = 4.0),
+      otherMetrics = Seq(
+        PositiveCount(ratingThreshold = 4.0),
+        PrecisionAtK(k = 10, ratingThreshold = 2.0),
+        PositiveCount(ratingThreshold = 2.0),
+        PrecisionAtK(k = 10, ratingThreshold = 1.0),
+        PositiveCount(ratingThreshold = 1.0)
+      )))
+}
+
+
+object ComprehensiveRecommendationEvaluation extends Evaluation {
+  val ratingThresholds = Seq(0.0, 2.0, 4.0)
+  val ks = Seq(1, 3, 10)
+
+  engineEvaluator = (
+    RecommendationEngine(),
+    MetricEvaluator(
+      metric = PrecisionAtK(k = 3, ratingThreshold = 2.0),
+      otherMetrics = (
+        (for (r <- ratingThresholds) yield PositiveCount(ratingThreshold = r)) ++
+        (for (r <- ratingThresholds; k <- ks) yield PrecisionAtK(k = k, ratingThreshold = r))
+      )))
+}
+
+
+trait BaseEngineParamsList extends EngineParamsGenerator {
+  protected val baseEP = EngineParams(
+    dataSourceParams = DataSourceParams(
+      appName = "MyApp1",
+      evalParams = Some(DataSourceEvalParams(kFold = 5, queryNum = 10))))
+}
+
+object EngineParamsList extends BaseEngineParamsList {
+  engineParamsList = for(
+    rank <- Seq(5, 10, 20);
+    numIterations <- Seq(1, 5, 10))
+    yield baseEP.copy(
+      algorithmParamsList = Seq(
+        ("als", ALSAlgorithmParams(rank, numIterations, 0.01, Some(3)))))
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/blacklist-items/src/main/scala/Preparator.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/blacklist-items/src/main/scala/Preparator.scala b/examples/scala-parallel-recommendation/blacklist-items/src/main/scala/Preparator.scala
new file mode 100644
index 0000000..6a41c47
--- /dev/null
+++ b/examples/scala-parallel-recommendation/blacklist-items/src/main/scala/Preparator.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.recommendation
+
+import org.apache.predictionio.controller.PPreparator
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+class Preparator
+  extends PPreparator[TrainingData, PreparedData] {
+
+  def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = {
+    new PreparedData(ratings = trainingData.ratings)
+  }
+}
+
+class PreparedData(
+  val ratings: RDD[Rating]
+) extends Serializable

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/blacklist-items/src/main/scala/Serving.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/blacklist-items/src/main/scala/Serving.scala b/examples/scala-parallel-recommendation/blacklist-items/src/main/scala/Serving.scala
new file mode 100644
index 0000000..c478455
--- /dev/null
+++ b/examples/scala-parallel-recommendation/blacklist-items/src/main/scala/Serving.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.recommendation
+
+import org.apache.predictionio.controller.LServing
+
+class Serving
+  extends LServing[Query, PredictedResult] {
+
+  override
+  def serve(query: Query,
+    predictedResults: Seq[PredictedResult]): PredictedResult = {
+    predictedResults.head
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/blacklist-items/template.json
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/blacklist-items/template.json b/examples/scala-parallel-recommendation/blacklist-items/template.json
new file mode 100644
index 0000000..d076ec5
--- /dev/null
+++ b/examples/scala-parallel-recommendation/blacklist-items/template.json
@@ -0,0 +1 @@
+{"pio": {"version": { "min": "0.10.0-incubating" }}}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-prepartor/.gitignore
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/custom-prepartor/.gitignore b/examples/scala-parallel-recommendation/custom-prepartor/.gitignore
deleted file mode 100644
index df6668d..0000000
--- a/examples/scala-parallel-recommendation/custom-prepartor/.gitignore
+++ /dev/null
@@ -1,5 +0,0 @@
-data/sample_movielens_data.txt
-manifest.json
-target/
-/pio.sbt
-pio.log

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-prepartor/build.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/custom-prepartor/build.sbt b/examples/scala-parallel-recommendation/custom-prepartor/build.sbt
deleted file mode 100644
index 4b90f03..0000000
--- a/examples/scala-parallel-recommendation/custom-prepartor/build.sbt
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import AssemblyKeys._
-
-assemblySettings
-
-name := "template-scala-parallel-recommendation-custom-preparator"
-
-organization := "org.apache.predictionio"
-
-libraryDependencies ++= Seq(
-  "org.apache.predictionio"    %% "core"          % pioVersion.value % "provided",
-  "org.apache.spark" %% "spark-core"    % "1.2.0" % "provided",
-  "org.apache.spark" %% "spark-mllib"   % "1.2.0" % "provided")

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-prepartor/data/import_eventserver.py
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/custom-prepartor/data/import_eventserver.py b/examples/scala-parallel-recommendation/custom-prepartor/data/import_eventserver.py
deleted file mode 100644
index 5bd13bf..0000000
--- a/examples/scala-parallel-recommendation/custom-prepartor/data/import_eventserver.py
+++ /dev/null
@@ -1,73 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""
-Import sample data for recommendation engine
-"""
-
-import predictionio
-import argparse
-import random
-
-RATE_ACTIONS_DELIMITER = "::"
-SEED = 3
-
-def import_events(client, file):
-  f = open(file, 'r')
-  random.seed(SEED)
-  count = 0
-  print "Importing data..."
-  for line in f:
-    data = line.rstrip('\r\n').split(RATE_ACTIONS_DELIMITER)
-    # For demonstration purpose, randomly mix in some buy events
-    if (random.randint(0, 1) == 1):
-      client.create_event(
-        event="rate",
-        entity_type="user",
-        entity_id=data[0],
-        target_entity_type="item",
-        target_entity_id=data[1],
-        properties= { "rating" : float(data[2]) }
-      )
-    else:
-      client.create_event(
-        event="buy",
-        entity_type="user",
-        entity_id=data[0],
-        target_entity_type="item",
-        target_entity_id=data[1]
-      )
-    count += 1
-  f.close()
-  print "%s events are imported." % count
-
-if __name__ == '__main__':
-  parser = argparse.ArgumentParser(
-    description="Import sample data for recommendation engine")
-  parser.add_argument('--access_key', default='invald_access_key')
-  parser.add_argument('--url', default="http://localhost:7070")
-  parser.add_argument('--file', default="./data/sample_movielens_data.txt")
-
-  args = parser.parse_args()
-  print args
-
-  client = predictionio.EventClient(
-    access_key=args.access_key,
-    url=args.url,
-    threads=5,
-    qsize=500)
-  import_events(client, args.file)

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-prepartor/data/sample_not_train_data.txt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/custom-prepartor/data/sample_not_train_data.txt b/examples/scala-parallel-recommendation/custom-prepartor/data/sample_not_train_data.txt
deleted file mode 100644
index 077e96b..0000000
--- a/examples/scala-parallel-recommendation/custom-prepartor/data/sample_not_train_data.txt
+++ /dev/null
@@ -1,8 +0,0 @@
-3
-4
-10
-22
-34
-54
-65
-89

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-prepartor/data/send_query.py
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/custom-prepartor/data/send_query.py b/examples/scala-parallel-recommendation/custom-prepartor/data/send_query.py
deleted file mode 100644
index ca19dc5..0000000
--- a/examples/scala-parallel-recommendation/custom-prepartor/data/send_query.py
+++ /dev/null
@@ -1,24 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""
-Send sample query to prediction engine
-"""
-
-import predictionio
-engine_client = predictionio.EngineClient(url="http://localhost:8000")
-print engine_client.send_query({"user": "1", "num": 4})

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-prepartor/engine.json
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/custom-prepartor/engine.json b/examples/scala-parallel-recommendation/custom-prepartor/engine.json
deleted file mode 100644
index 66918ca..0000000
--- a/examples/scala-parallel-recommendation/custom-prepartor/engine.json
+++ /dev/null
@@ -1,26 +0,0 @@
-{
-  "id": "default",
-  "description": "Default settings",
-  "engineFactory": "org.template.recommendation.RecommendationEngine",
-  "datasource": {
-    "params" : {
-      "appId": 1
-    }
-  },
-  "preparator": {
-    "params": {
-      "filepath": "./data/sample_not_train_data.txt"
-    }
-  },
-  "algorithms": [
-    {
-      "name": "als",
-      "params": {
-        "rank": 10,
-        "numIterations": 20,
-        "lambda": 0.01,
-        "seed": 3
-      }
-    }
-  ]
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-prepartor/project/assembly.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/custom-prepartor/project/assembly.sbt b/examples/scala-parallel-recommendation/custom-prepartor/project/assembly.sbt
deleted file mode 100644
index 54c3252..0000000
--- a/examples/scala-parallel-recommendation/custom-prepartor/project/assembly.sbt
+++ /dev/null
@@ -1 +0,0 @@
-addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-prepartor/project/pio-build.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/custom-prepartor/project/pio-build.sbt b/examples/scala-parallel-recommendation/custom-prepartor/project/pio-build.sbt
deleted file mode 100644
index 9aed0ee..0000000
--- a/examples/scala-parallel-recommendation/custom-prepartor/project/pio-build.sbt
+++ /dev/null
@@ -1 +0,0 @@
-addSbtPlugin("org.apache.predictionio" % "pio-build" % "0.9.0")

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-prepartor/src/main/scala/ALSAlgorithm.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/custom-prepartor/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-recommendation/custom-prepartor/src/main/scala/ALSAlgorithm.scala
deleted file mode 100644
index 22904af..0000000
--- a/examples/scala-parallel-recommendation/custom-prepartor/src/main/scala/ALSAlgorithm.scala
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.recommendation
-
-import org.apache.predictionio.controller.PAlgorithm
-import org.apache.predictionio.controller.Params
-import org.apache.predictionio.data.storage.BiMap
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.mllib.recommendation.ALS
-import org.apache.spark.mllib.recommendation.{Rating => MLlibRating}
-import org.apache.spark.mllib.recommendation.ALSModel
-
-import grizzled.slf4j.Logger
-
-case class ALSAlgorithmParams(
-  rank: Int,
-  numIterations: Int,
-  lambda: Double,
-  seed: Option[Long]) extends Params
-
-class ALSAlgorithm(val ap: ALSAlgorithmParams)
-  extends PAlgorithm[PreparedData, ALSModel, Query, PredictedResult] {
-
-  @transient lazy val logger = Logger[this.type]
-
-  def train(sc: SparkContext, data: PreparedData): ALSModel = {
-    // MLLib ALS cannot handle empty training data.
-    require(!data.ratings.take(1).isEmpty,
-      s"RDD[Rating] in PreparedData cannot be empty." +
-      " Please check if DataSource generates TrainingData" +
-      " and Preprator generates PreparedData correctly.")
-    // Convert user and item String IDs to Int index for MLlib
-    val userStringIntMap = BiMap.stringInt(data.ratings.map(_.user))
-    val itemStringIntMap = BiMap.stringInt(data.ratings.map(_.item))
-    val mllibRatings = data.ratings.map( r =>
-      // MLlibRating requires integer index for user and item
-      MLlibRating(userStringIntMap(r.user), itemStringIntMap(r.item), r.rating)
-    )
-
-    // seed for MLlib ALS
-    val seed = ap.seed.getOrElse(System.nanoTime)
-
-    // If you only have one type of implicit event (Eg. "view" event only),
-    // replace ALS.train(...) with
-    //val m = ALS.trainImplicit(
-      //ratings = mllibRatings,
-      //rank = ap.rank,
-      //iterations = ap.numIterations,
-      //lambda = ap.lambda,
-      //blocks = -1,
-      //alpha = 1.0,
-      //seed = seed)
-
-    val m = ALS.train(
-      ratings = mllibRatings,
-      rank = ap.rank,
-      iterations = ap.numIterations,
-      lambda = ap.lambda,
-      blocks = -1,
-      seed = seed)
-
-    new ALSModel(
-      rank = m.rank,
-      userFeatures = m.userFeatures,
-      productFeatures = m.productFeatures,
-      userStringIntMap = userStringIntMap,
-      itemStringIntMap = itemStringIntMap)
-  }
-
-  def predict(model: ALSModel, query: Query): PredictedResult = {
-    // Convert String ID to Int index for Mllib
-    model.userStringIntMap.get(query.user).map { userInt =>
-      // create inverse view of itemStringIntMap
-      val itemIntStringMap = model.itemStringIntMap.inverse
-      // recommendProducts() returns Array[MLlibRating], which uses item Int
-      // index. Convert it to String ID for returning PredictedResult
-      val itemScores = model.recommendProducts(userInt, query.num)
-        .map (r => ItemScore(itemIntStringMap(r.product), r.rating))
-      new PredictedResult(itemScores)
-    }.getOrElse{
-      logger.info(s"No prediction for unknown user ${query.user}.")
-      new PredictedResult(Array.empty)
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-prepartor/src/main/scala/ALSModel.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/custom-prepartor/src/main/scala/ALSModel.scala b/examples/scala-parallel-recommendation/custom-prepartor/src/main/scala/ALSModel.scala
deleted file mode 100644
index 4697732..0000000
--- a/examples/scala-parallel-recommendation/custom-prepartor/src/main/scala/ALSModel.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.mllib.recommendation
-// This must be the same package as Spark's MatrixFactorizationModel because
-// MatrixFactorizationModel's constructor is private and we are using
-// its constructor in order to save and load the model
-
-import org.template.recommendation.ALSAlgorithmParams
-
-import org.apache.predictionio.controller.IPersistentModel
-import org.apache.predictionio.controller.IPersistentModelLoader
-import org.apache.predictionio.data.storage.BiMap
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-class ALSModel(
-    override val rank: Int,
-    override val userFeatures: RDD[(Int, Array[Double])],
-    override val productFeatures: RDD[(Int, Array[Double])],
-    val userStringIntMap: BiMap[String, Int],
-    val itemStringIntMap: BiMap[String, Int])
-  extends MatrixFactorizationModel(rank, userFeatures, productFeatures)
-  with IPersistentModel[ALSAlgorithmParams] {
-
-  def save(id: String, params: ALSAlgorithmParams,
-    sc: SparkContext): Boolean = {
-
-    sc.parallelize(Seq(rank)).saveAsObjectFile(s"/tmp/${id}/rank")
-    userFeatures.saveAsObjectFile(s"/tmp/${id}/userFeatures")
-    productFeatures.saveAsObjectFile(s"/tmp/${id}/productFeatures")
-    sc.parallelize(Seq(userStringIntMap))
-      .saveAsObjectFile(s"/tmp/${id}/userStringIntMap")
-    sc.parallelize(Seq(itemStringIntMap))
-      .saveAsObjectFile(s"/tmp/${id}/itemStringIntMap")
-    true
-  }
-
-  override def toString = {
-    s"userFeatures: [${userFeatures.count()}]" +
-    s"(${userFeatures.take(2).toList}...)" +
-    s" productFeatures: [${productFeatures.count()}]" +
-    s"(${productFeatures.take(2).toList}...)" +
-    s" userStringIntMap: [${userStringIntMap.size}]" +
-    s"(${userStringIntMap.take(2)}...)" +
-    s" itemStringIntMap: [${itemStringIntMap.size}]" +
-    s"(${itemStringIntMap.take(2)}...)"
-  }
-}
-
-object ALSModel
-  extends IPersistentModelLoader[ALSAlgorithmParams, ALSModel] {
-  def apply(id: String, params: ALSAlgorithmParams,
-    sc: Option[SparkContext]) = {
-    new ALSModel(
-      rank = sc.get.objectFile[Int](s"/tmp/${id}/rank").first,
-      userFeatures = sc.get.objectFile(s"/tmp/${id}/userFeatures"),
-      productFeatures = sc.get.objectFile(s"/tmp/${id}/productFeatures"),
-      userStringIntMap = sc.get
-        .objectFile[BiMap[String, Int]](s"/tmp/${id}/userStringIntMap").first,
-      itemStringIntMap = sc.get
-        .objectFile[BiMap[String, Int]](s"/tmp/${id}/itemStringIntMap").first)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-prepartor/src/main/scala/DataSource.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/custom-prepartor/src/main/scala/DataSource.scala b/examples/scala-parallel-recommendation/custom-prepartor/src/main/scala/DataSource.scala
deleted file mode 100644
index 1d3f0df..0000000
--- a/examples/scala-parallel-recommendation/custom-prepartor/src/main/scala/DataSource.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.recommendation
-
-import org.apache.predictionio.controller.PDataSource
-import org.apache.predictionio.controller.EmptyEvaluationInfo
-import org.apache.predictionio.controller.EmptyActualResult
-import org.apache.predictionio.controller.Params
-import org.apache.predictionio.data.storage.Event
-import org.apache.predictionio.data.storage.Storage
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-import grizzled.slf4j.Logger
-
-case class DataSourceParams(appId: Int) extends Params
-
-class DataSource(val dsp: DataSourceParams)
-  extends PDataSource[TrainingData,
-      EmptyEvaluationInfo, Query, EmptyActualResult] {
-
-  @transient lazy val logger = Logger[this.type]
-
-  override
-  def readTraining(sc: SparkContext): TrainingData = {
-    val eventsDb = Storage.getPEvents()
-    val eventsRDD: RDD[Event] = eventsDb.find(
-      appId = dsp.appId,
-      entityType = Some("user"),
-      eventNames = Some(List("rate", "buy")), // read "rate" and "buy" event
-      // targetEntityType is optional field of an event.
-      targetEntityType = Some(Some("item")))(sc)
-
-    val ratingsRDD: RDD[Rating] = eventsRDD.map { event =>
-      val rating = try {
-        val ratingValue: Double = event.event match {
-          case "rate" => event.properties.get[Double]("rating")
-          case "buy" => 4.0 // map buy event to rating value of 4
-          case _ => throw new Exception(s"Unexpected event ${event} is read.")
-        }
-        // entityId and targetEntityId is String
-        Rating(event.entityId,
-          event.targetEntityId.get,
-          ratingValue)
-      } catch {
-        case e: Exception => {
-          logger.error(s"Cannot convert ${event} to Rating. Exception: ${e}.")
-          throw e
-        }
-      }
-      rating
-    }
-    new TrainingData(ratingsRDD)
-  }
-}
-
-case class Rating(
-  user: String,
-  item: String,
-  rating: Double
-)
-
-class TrainingData(
-  val ratings: RDD[Rating]
-) extends Serializable {
-  override def toString = {
-    s"ratings: [${ratings.count()}] (${ratings.take(2).toList}...)"
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-prepartor/src/main/scala/Engine.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/custom-prepartor/src/main/scala/Engine.scala b/examples/scala-parallel-recommendation/custom-prepartor/src/main/scala/Engine.scala
deleted file mode 100644
index 1446ca4..0000000
--- a/examples/scala-parallel-recommendation/custom-prepartor/src/main/scala/Engine.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.recommendation
-
-import org.apache.predictionio.controller.IEngineFactory
-import org.apache.predictionio.controller.Engine
-
-case class Query(
-  user: String,
-  num: Int
-)
-
-case class PredictedResult(
-  itemScores: Array[ItemScore]
-)
-
-case class ItemScore(
-  item: String,
-  score: Double
-)
-
-object RecommendationEngine extends IEngineFactory {
-  def apply() = {
-    new Engine(
-      classOf[DataSource],
-      classOf[Preparator],
-      Map("als" -> classOf[ALSAlgorithm]),
-      classOf[Serving])
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-prepartor/src/main/scala/Preparator.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/custom-prepartor/src/main/scala/Preparator.scala b/examples/scala-parallel-recommendation/custom-prepartor/src/main/scala/Preparator.scala
deleted file mode 100644
index 4d62392..0000000
--- a/examples/scala-parallel-recommendation/custom-prepartor/src/main/scala/Preparator.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.recommendation
-
-import org.apache.predictionio.controller.PPreparator
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-import scala.io.Source // ADDED
-import org.apache.predictionio.controller.Params // ADDED
-
-// ADDED CustomPreparatorParams case class
-case class CustomPreparatorParams(
-  filepath: String
-) extends Params
-
-class Preparator(pp: CustomPreparatorParams) // ADDED CustomPreparatorParams
-  extends PPreparator[TrainingData, PreparedData] {
-
-  def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = {
-    val noTrainItems = Source.fromFile(pp.filepath).getLines.toSet //CHANGED
-    val ratings = trainingData.ratings.filter( r =>
-      !noTrainItems.contains(r.item)
-    )
-    new PreparedData(ratings)
-  }
-}
-
-class PreparedData(
-  val ratings: RDD[Rating]
-) extends Serializable

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-prepartor/src/main/scala/Serving.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/custom-prepartor/src/main/scala/Serving.scala b/examples/scala-parallel-recommendation/custom-prepartor/src/main/scala/Serving.scala
deleted file mode 100644
index 48fda35..0000000
--- a/examples/scala-parallel-recommendation/custom-prepartor/src/main/scala/Serving.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.recommendation
-
-import org.apache.predictionio.controller.LServing
-
-class Serving
-  extends LServing[Query, PredictedResult] {
-
-  override def serve(query: Query,
-    predictedResults: Seq[PredictedResult]): PredictedResult = {
-    predictedResults.head
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-query/.gitignore
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/custom-query/.gitignore b/examples/scala-parallel-recommendation/custom-query/.gitignore
deleted file mode 100644
index 295cafd..0000000
--- a/examples/scala-parallel-recommendation/custom-query/.gitignore
+++ /dev/null
@@ -1,4 +0,0 @@
-data/sample_movielens_data.txt
-manifest.json
-target/
-pio.log

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-query/README.md
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/custom-query/README.md b/examples/scala-parallel-recommendation/custom-query/README.md
deleted file mode 100644
index c04962a..0000000
--- a/examples/scala-parallel-recommendation/custom-query/README.md
+++ /dev/null
@@ -1,161 +0,0 @@
-<!--
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
--->
-
-# Filtering result set on custom item field (Recommendation)
-
-## Import data to pio engine
-
-By default the recommendation template reads the rate and buy user events and the user itself. You can modify the default DataSource to read your custom item with specified list of properties.
-
-First off all you have to import your events to the pio event server.
-
-You can use ImportDataScript.scala to import users, movies and rate events from [movielenses database](http://grouplens.org/datasets/movielens/). 
-Make sure that data files are in `UTF-8` encoding.
-
-This command line tool accepts 2 args:
-
- 1. app access key and it is mandatory
- 2. pio engine url. default is `http://localhost:7070`
- 
-For example in the sbt console: `> runMain org.template.recommendation.ImportDataScript.scala <access_key>`
-
-## Modify the engine.
-
-This example is based on v0.1.1 of [scala parallel recommendation template](https://github.com/PredictionIO/template-scala-parallel-recommendation/)
-
-In this example we modify DataSource to read custom event with one property.
-
-* Modify the Query data structure with your needs, at this example we added item creation year as a query/filter param:
-
-`case class Query(user: String, num: Int, creationYear: Option[Int] = None)`
-
-* Define the Item case class which will describe your data:
-
-`case class Item(creationYear: Option[Int])`
-
-* Define the utility unmarshaller to read the list of properties to your structure:
-
-```
-object ItemMarshaller {
- def unmarshall(properties: DataMap): Option[Item] =
-   Some(Item(properties.getOpt[Int]("creationYear")))
-}
-```
-
-* Modify the TrainingData class to hold your custom items:
-
-`class TrainingData(val ratings: RDD[Rating], val items: RDD[(String, Item)])`
- 
-* Modify the readTraining method in the DataSource to read your custom item:
-
-```
-val itemsRDD = eventsDb.aggregateProperties(
-    appId = dsp.appId,
-    entityType = "item"
-  )(sc).flatMap { case (entityId, properties) ⇒
-    ItemMarshaller.unmarshall(properties).map(entityId → _)
-  }
-```
-
-* Modify the ASLModel to hold your custom items to have possibility filter them.
-
-```
-class ALSModel(
- val productFeatures: RDD[(Int, Array[Double])],
- val itemStringIntMap: BiMap[String, Int],
- val items: Map[Int, Item])
-```
-
-
-* Modify train method in ALSAlgorithm to match you items with numeric ids that are needed by the algo.
-
-```
-val items: Map[Int, Item] = data.items.map { case (id, item) ⇒
-   (itemStringIntMap(id), item)
-}.collectAsMap.toMap
-```
-
-* Define the filterItems method in ALSAlgorithm to filter the predicted result set according the query.
-
-```
-private def filterItems(selectedScores: Array[(Int, Double)],
-                       items: Map[Int, Item],
-                       query: Query) =
- selectedScores.view.filter { case (iId, _) ⇒
-   items(iId).creationYear.map(icr ⇒ query.creationYear.forall(icr >= _))
-     .getOrElse(true)
- }
-```
-
-* Modify the predict method in the ALSAlgorithm to filter predicted result set:
-
-`val filteredScores = filterItems(indexScores, model.items, query)`
-
-* And the last step could be to modify the serving to sort recommended items by the year of movie creation(our custom property) as Hagay Gazuli mentioned in the [google group](https://groups.google.com/forum/#!searchin/predictionio-user/created$20%7Csort:date/predictionio-user/LEHxuc0Bu_0/W9RkAApvivsJ).
-
-```
-class Serving extends LServxing[Query, PredictedResult] {
-  override def serve(query: Query,
-            predictedResults: Seq[PredictedResult]): PredictedResult =
-    predictedResults.headOption.map { result ⇒
-      val preparedItems = result.itemScores
-        .sortBy { case ItemScore(item, score, year) ⇒ year }(
-          Ordering.Option[Int].reverse)
-      new PredictedResult(preparedItems)
-    }.getOrElse(new PredictedResult(Array.empty[ItemScore]))
-}
-```
-
-* Now you can filter your recommendation by items that were made after some certain year:
-
-```> curl -H 'Content-Type: application/json' '127.0.0.1:8000/queries.json' -d '{"user":100, "num":5, "creationYear":1990}' | python -m json.tool```
-
-Where result of curl is piped to python json.tool lib just for convenience to pretty print the response from engine:
-```
-    "itemScores": [
-        {
-            "creationYear": 1996,
-            "item": "831",
-            "score": 518.9319563470217
-        },
-        {
-            "creationYear": 1996,
-            "item": "1619",
-            "score": 15.321792791296401
-        },
-        {
-            "creationYear": 1994,
-            "item": "1554",
-            "score": 628.1994336041231
-        },
-        {
-            "creationYear": 1993,
-            "item": "736",
-            "score": 419.3508956666954
-        },
-        {
-            "creationYear": 1991,
-            "item": "627",
-            "score": 498.28818189885175
-        }
-    ]
-}
-```
-
-That's it! Now your recommendation engine is using filtering on custom item field on predicted result set.
-
-

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-query/build.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/custom-query/build.sbt b/examples/scala-parallel-recommendation/custom-query/build.sbt
deleted file mode 100644
index 1a18607..0000000
--- a/examples/scala-parallel-recommendation/custom-query/build.sbt
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import AssemblyKeys._
-
-assemblySettings
-
-name := "template-scala-parallel-recommendation-custom-query"
-
-organization := "org.apache.predictionio"
-
-def provided  (deps: ModuleID*): Seq[ModuleID] = deps map (_ % "provided")
-
-libraryDependencies ++= provided(
-  "org.apache.predictionio"    %% "core"          % "0.8.6",
-  "org.apache.spark" %% "spark-core"    % "1.2.0",
-  "org.apache.spark" %% "spark-mllib"   % "1.2.0")

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-query/data/build.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/custom-query/data/build.sbt b/examples/scala-parallel-recommendation/custom-query/data/build.sbt
deleted file mode 100644
index d4cf9b5..0000000
--- a/examples/scala-parallel-recommendation/custom-query/data/build.sbt
+++ /dev/null
@@ -1,8 +0,0 @@
-name := "import-movielenses"
-
-organization := "org.template.recommendation"
-
-def provided  (deps: ModuleID*): Seq[ModuleID] = deps map (_ % "provided")
-
-libraryDependencies ++= provided(
-  "org.apache.predictionio" % "client" % "0.8.3" withSources() withJavadoc())

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-query/data/src/main/scala/org/template/recommendation/ImportDataScript.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/custom-query/data/src/main/scala/org/template/recommendation/ImportDataScript.scala b/examples/scala-parallel-recommendation/custom-query/data/src/main/scala/org/template/recommendation/ImportDataScript.scala
deleted file mode 100644
index 11d9bd3..0000000
--- a/examples/scala-parallel-recommendation/custom-query/data/src/main/scala/org/template/recommendation/ImportDataScript.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.recommendation
-
-import org.apache.predictionio.{Event, EventClient
-}
-import scala.collection.JavaConverters._
-
-import scala.io.Source
-
-object ImportDataScript extends App {
-
-  /**
-   * Imports users, movies and rate events if movielens database to pio.
-   * first arg is mandatory - it is app access key
-   * second arg is optional - it is pio engine url (default is `localhost:7070`)
-   * @param args
-   */
-  override def main(args: Array[String]): Unit = {
-    val accessKey = if (args.length == 0) {
-      throw new IllegalArgumentException("access key should be passed")
-    } else args(0)
-
-    val engineUrl = if (args.length > 1) args(1) else "http://localhost:7070"
-    implicit val client = new EventClient(accessKey, engineUrl)
-    println(s"imported ${importMovies.size} movies")
-    println(s"imported ${importUsers.size} users")
-    println(s"imported ${importRateEvents.size} events")
-  }
-
-  /**
-   * imports events to the pio server.
-   * @return the events id list.
-   */
-  def importRateEvents(implicit client: EventClient): Iterator[_] =
-    readCSV("data/u.data", "\t").flatMap { event =>
-      val eventObj = event.lift
-      (for {
-        entityId ← eventObj(0)
-        targetEntityId ← eventObj(1)
-        rating ← eventObj(2)
-      } yield new Event()
-          .event("rate")
-          .entityId(entityId)
-          .entityType("user")
-          .properties(javaMap("rating" → new java.lang.Double(rating)))
-          .targetEntityId(targetEntityId)
-          .targetEntityType("movie")
-        ).map(client.createEvent)
-    }
-
-  def importUsers(implicit ec: EventClient): Iterator[_] =
-    readCSV("data/u.user").flatMap { user ⇒
-      val userObj = user.lift
-      for {
-        uId ← userObj(0)
-        age ← userObj(1)
-      } yield ec.setUser(uId, javaMap("age" → age))
-    }
-
-  /**
-   * imports movies to pio server
-   * @return the number if movies where imported
-   */
-  def importMovies(implicit client: EventClient): Iterator[Unit] = {
-    readCSV("data/u.item").map { movie ⇒
-      val movieObj = movie.lift
-      val releaseYearOpt = movieObj(2)
-        .flatMap(_.split("-").lift(2).map(_.toInt))
-        .map(releaseYear ⇒ "creationYear" → new Integer(releaseYear))
-      for {
-        id ← movieObj(0)
-        title ← movieObj(1).map(t ⇒ "title" → t)
-        releaseYear ← releaseYearOpt
-      } yield client.setItem(id, javaMap(title, releaseYear))
-    }
-  }
-
-  private def javaMap(pair: (String, AnyRef)*) = Map(pair: _*).asJava
-
-  /**
-   * reads csv file into list of string arrays each of them represents splitted
-   * with specified delimeter line
-   * @param filename path to csv file
-   * @param delimiter delimiter of the properties in the file
-   * @return the list of string arrays made from every file line
-   */
-  private def readCSV(filename: String,
-                      delimiter: String = "\\|") =
-    Source.fromFile(filename, "UTF-8")
-      .getLines()
-      .map(_.split(delimiter).toVector)
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-query/engine.json
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/custom-query/engine.json b/examples/scala-parallel-recommendation/custom-query/engine.json
deleted file mode 100644
index 8f3ff21..0000000
--- a/examples/scala-parallel-recommendation/custom-query/engine.json
+++ /dev/null
@@ -1,21 +0,0 @@
-{
-  "id": "default",
-  "description": "Default settings",
-  "engineFactory": "org.template.recommendation.RecommendationEngine",
-  "datasource": {
-    "params" : {
-      "appId": 5
-    }
-  },
-  "algorithms": [
-    {
-      "name": "als",
-      "params": {
-        "rank": 10,
-        "numIterations": 20,
-        "lambda": 0.01,
-        "seed": 3
-      }
-    }
-  ]
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-query/project/assembly.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/custom-query/project/assembly.sbt b/examples/scala-parallel-recommendation/custom-query/project/assembly.sbt
deleted file mode 100644
index 54c3252..0000000
--- a/examples/scala-parallel-recommendation/custom-query/project/assembly.sbt
+++ /dev/null
@@ -1 +0,0 @@
-addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")