You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by sh...@apache.org on 2017/07/10 04:10:05 UTC

[02/11] incubator-predictionio git commit: [PIO-97] Fixes examples of the official templates for v0.11.0-incubating.

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/no-set-user/src/main/scala/Preparator.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/no-set-user/src/main/scala/Preparator.scala b/examples/scala-parallel-similarproduct/no-set-user/src/main/scala/Preparator.scala
deleted file mode 100644
index 73644f9..0000000
--- a/examples/scala-parallel-similarproduct/no-set-user/src/main/scala/Preparator.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.similarproduct
-
-import org.apache.predictionio.controller.PPreparator
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-class Preparator
-  extends PPreparator[TrainingData, PreparedData] {
-
-  def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = {
-    new PreparedData(
-      items = trainingData.items,
-      viewEvents = trainingData.viewEvents)
-  }
-}
-
-class PreparedData(
-  val items: RDD[(String, Item)],
-  val viewEvents: RDD[ViewEvent]
-) extends Serializable

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/no-set-user/src/main/scala/Serving.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/no-set-user/src/main/scala/Serving.scala b/examples/scala-parallel-similarproduct/no-set-user/src/main/scala/Serving.scala
deleted file mode 100644
index 3e115d5..0000000
--- a/examples/scala-parallel-similarproduct/no-set-user/src/main/scala/Serving.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.similarproduct
-
-import org.apache.predictionio.controller.LServing
-
-class Serving
-  extends LServing[Query, PredictedResult] {
-
-  override def serve(query: Query,
-    predictedResults: Seq[PredictedResult]): PredictedResult = {
-    predictedResults.head
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/no-set-user/template.json
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/no-set-user/template.json b/examples/scala-parallel-similarproduct/no-set-user/template.json
deleted file mode 100644
index 932e603..0000000
--- a/examples/scala-parallel-similarproduct/no-set-user/template.json
+++ /dev/null
@@ -1 +0,0 @@
-{"pio": {"version": { "min": "0.9.0" }}}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/recommended-user/.gitignore
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/recommended-user/.gitignore b/examples/scala-parallel-similarproduct/recommended-user/.gitignore
index 8241178..5dbe602 100644
--- a/examples/scala-parallel-similarproduct/recommended-user/.gitignore
+++ b/examples/scala-parallel-similarproduct/recommended-user/.gitignore
@@ -1 +1,4 @@
-/pio.sbt
+manifest.json
+target/
+pio.log
+/pio.sbt
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/recommended-user/README.md
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/recommended-user/README.md b/examples/scala-parallel-similarproduct/recommended-user/README.md
deleted file mode 100644
index a274feb..0000000
--- a/examples/scala-parallel-similarproduct/recommended-user/README.md
+++ /dev/null
@@ -1,232 +0,0 @@
-<!--
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
--->
-
-# Recommended User Template
-
-This example is based on version v0.1.3 of the Similar Product Engine Template. The Similar Product Engine Template has been customized to recommend users instead of items.
-
-The main difference from the original template is the following:
-
-Instead of using user-to-item events to find similar items, user-to-user events are used to find similar users you may also follow, like, etc (depending on which events are used in training and how the events are used). By default, "follow" events are used.
-
-## Overview
-
-This engine template recommends users that are "similar" to other users.
-Similarity is not defined by the user's attributes but by the user's previous actions. By default, it uses the 'follow' action such that user A and B are considered similar if most users who follows A also follows B.
-
-This template is ideal for recommending users to other users based on their recent actions.
-Use the IDs of the recently viewed users of a customer as the *Query*,
-the engine will predict other users that this customer may also follow or like.
-
-This approach works perfectly for customers who are **first-time visitors** or have not signed in.
-Recommendations are made dynamically in *real-time* based on the most recent user preference you provide in the *Query*.
-You can, therefore, recommend users to visitors without knowing a long history about them.
-
-One can also use this template to build the popular feature of "people you may also follow, like, etc** quickly by provide similar users to what you have just viewed or followed.
-
-
-## Highlights of the modification from original Similar Product Template
-
-### Engine.scala
-
-- change the Query and and Predicted Result to from "items" to "users" and "similarUsers". The "categories" field is removed:
-
-```scala
-case class Query(
-  users: List[String], // MODIFED
-  num: Int,
-  whiteList: Option[Set[String]],
-  blackList: Option[Set[String]]
-) extends Serializable
-
-case class PredictedResult(
-  similarUserScores: Array[similarUserScore] // MODIFED
-) extends Serializable
-
-case class similarUserScore(
-  user: String, // MODIFED
-  score: Double
-) extends Serializable
-```
-
-### DataSource.scala
-
-- Since user-to-user events will be used, itemsRDD is not needed and removed.
-- change from ViewEvent to FollowEvent in Training Data
-- change to read "folow" events
-
-```scala
-    val followEventsRDD: RDD[FollowEvent] = eventsDb.find(
-      appId = dsp.appId,
-      entityType = Some("user"),
-      eventNames = Some(List("follow")),
-      // targetEntityType is optional field of an event.
-      targetEntityType = Some(Some("user")))(sc)
-      // eventsDb.find() returns RDD[Event]
-      .map { event =>
-        val followEvent = try {
-          event.event match {
-            case "follow" => FollowEvent(
-              user = event.entityId,
-              followedUser = event.targetEntityId.get,
-              t = event.eventTime.getMillis)
-            case _ => throw new Exception(s"Unexpected event $event is read.")
-          }
-        } catch {
-          case e: Exception => {
-            logger.error(s"Cannot convert $event to FollowEvent." +
-              s" Exception: $e.")
-            throw e
-          }
-        }
-        followEvent
-      }.cache()
-```
-
-### Preparator.scala
-
-Change to pass the followEvents to Prepared Data.
-
-### Algorithm.scala
-
-Use Spark MLlib algorithm to train the productFeature vector by treating the followed user as the "product".
-
-Modify train to use "followEvents" to create MLlibRating object and remove the code that aggregate number of views:
-
-```scala
-    val mllibRatings = data.followEvents
-      .map { r =>
-        // Convert user and user String IDs to Int index for MLlib
-        val uindex = userStringIntMap.getOrElse(r.user, -1)
-        val iindex = similarUserStringIntMap.getOrElse(r.followedUser, -1)
-
-        if (uindex == -1)
-          logger.info(s"Couldn't convert nonexistent user ID ${r.user}"
-            + " to Int index.")
-
-        if (iindex == -1)
-          logger.info(s"Couldn't convert nonexistent followedUser ID ${r.followedUser}"
-            + " to Int index.")
-
-        ((uindex, iindex), 1)
-      }.filter { case ((u, i), v) =>
-        // keep events with valid user and user index
-        (u != -1) && (i != -1)
-      }
-      //.reduceByKey(_ + _) // aggregate all view events of same user-item pair // NOTE: REMOVED!!
-      .map { case ((u, i), v) =>
-        // MLlibRating requires integer index for user and user
-        MLlibRating(u, i, v)
-      }
-      .cache()
-```
-
-The ALSModel and predict() function is also changed accordingly.
-
-
-## Usage
-
-### Event Data Requirements
-
-By default, this template takes the following data from Event Server as Training Data:
-
-- User *$set* events
-- User *follow* User events
-
-### Input Query
-
-- List of UserIDs, which are the targeted users
-- N (number of users to be recommended)
-- List of white-listed UserIds (optional)
-- List of black-listed UserIds (optional)
-
-The template also supports black-list and white-list. If a white-list is provided, the engine will include only those users in its recommendation.
-Likewise, if a black-list is provided, the engine will exclude those users in its recommendation.
-
-## Documentation
-
-May refer to http://predictionio.incubator.apache.org/templates/similarproduct/quickstart/ with difference mentioned above.
-
-## Development Notes
-
-### import sample data
-
-```
-$ python data/import_eventserver.py --access_key <your_access_key>
-```
-
-### sample queries
-
-normal:
-
-```
-curl -H "Content-Type: application/json" \
--d '{ "users": ["u1", "u3", "u10", "u2", "u5", "u31", "u9"], "num": 10}' \
-http://localhost:8000/queries.json \
--w %{time_connect}:%{time_starttransfer}:%{time_total}
-```
-
-```
-curl -H "Content-Type: application/json" \
--d '{
-  "users": ["u1", "u3", "u10", "u2", "u5", "u31", "u9"],
-  "num": 10
-}' \
-http://localhost:8000/queries.json \
--w %{time_connect}:%{time_starttransfer}:%{time_total}
-```
-
-```
-curl -H "Content-Type: application/json" \
--d '{
-  "users": ["u1", "u3", "u10", "u2", "u5", "u31", "u9"],
-  "num": 10,
-  "whiteList": ["u21", "u26", "u40"]
-}' \
-http://localhost:8000/queries.json \
--w %{time_connect}:%{time_starttransfer}:%{time_total}
-```
-
-```
-curl -H "Content-Type: application/json" \
--d '{
-  "users": ["u1", "u3", "u10", "u2", "u5", "u31", "u9"],
-  "num": 10,
-  "blackList": ["u21", "u26", "u40"]
-}' \
-http://localhost:8000/queries.json \
--w %{time_connect}:%{time_starttransfer}:%{time_total}
-```
-
-unknown user:
-
-```
-curl -H "Content-Type: application/json" \
--d '{ "users": ["unk1", "u3", "u10", "u2", "u5", "u31", "u9"], "num": 10}' \
-http://localhost:8000/queries.json \
--w %{time_connect}:%{time_starttransfer}:%{time_total}
-```
-
-
-all unknown users:
-
-```
-curl -H "Content-Type: application/json" \
--d '{ "users": ["unk1", "unk2", "unk3", "unk4"], "num": 10}' \
-http://localhost:8000/queries.json \
--w %{time_connect}:%{time_starttransfer}:%{time_total}
-```

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/recommended-user/build.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/recommended-user/build.sbt b/examples/scala-parallel-similarproduct/recommended-user/build.sbt
index ef4f5a8..7409ab6 100644
--- a/examples/scala-parallel-similarproduct/recommended-user/build.sbt
+++ b/examples/scala-parallel-similarproduct/recommended-user/build.sbt
@@ -15,13 +15,10 @@
  * limitations under the License.
  */
 
-assemblySettings
-
 name := "template-scala-parallel-recommendeduser"
 
 organization := "org.apache.predictionio"
-
+scalaVersion := "2.11.8"
 libraryDependencies ++= Seq(
-  "org.apache.predictionio"    %% "core"          % pioVersion.value % "provided",
-  "org.apache.spark" %% "spark-core"    % "1.2.0" % "provided",
-  "org.apache.spark" %% "spark-mllib"   % "1.2.0" % "provided")
+  "org.apache.predictionio" %% "apache-predictionio-core" % "0.11.0-incubating" % "provided",
+  "org.apache.spark"        %% "spark-mllib"              % "2.1.1" % "provided")

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/recommended-user/data/import_eventserver.py
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/recommended-user/data/import_eventserver.py b/examples/scala-parallel-similarproduct/recommended-user/data/import_eventserver.py
index 9ea3199..5554324 100644
--- a/examples/scala-parallel-similarproduct/recommended-user/data/import_eventserver.py
+++ b/examples/scala-parallel-similarproduct/recommended-user/data/import_eventserver.py
@@ -28,13 +28,13 @@ SEED = 3
 def import_events(client):
   random.seed(SEED)
   count = 0
-  print client.get_status()
-  print "Importing data..."
+  print(client.get_status())
+  print("Importing data...")
 
-  # generate 10 users, with user ids u1,u2,....,u50
+  # generate 50 users, with user ids u1,u2,....,u50
   user_ids = ["u%s" % i for i in range(1, 51)]
   for user_id in user_ids:
-    print "Set user", user_id
+    print("Set user", user_id)
     client.create_event(
       event="$set",
       entity_type="user",
@@ -55,7 +55,7 @@ def import_events(client):
       )
       count += 1
 
-  print "%s events are imported." % count
+  print("%s events are imported." % count)
 
 if __name__ == '__main__':
   parser = argparse.ArgumentParser(
@@ -64,7 +64,7 @@ if __name__ == '__main__':
   parser.add_argument('--url', default="http://localhost:7070")
 
   args = parser.parse_args()
-  print args
+  print(args)
 
   client = predictionio.EventClient(
     access_key=args.access_key,

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/recommended-user/data/send_query.py
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/recommended-user/data/send_query.py b/examples/scala-parallel-similarproduct/recommended-user/data/send_query.py
index c1737ab..7c6b35c 100644
--- a/examples/scala-parallel-similarproduct/recommended-user/data/send_query.py
+++ b/examples/scala-parallel-similarproduct/recommended-user/data/send_query.py
@@ -21,4 +21,4 @@ Send sample query to prediction engine
 
 import predictionio
 engine_client = predictionio.EngineClient(url="http://localhost:8000")
-print engine_client.send_query({"users": ["u1", "u3"], "num": 10})
+print(engine_client.send_query({"users": ["u1", "u3"], "num": 10}))

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/recommended-user/engine.json
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/recommended-user/engine.json b/examples/scala-parallel-similarproduct/recommended-user/engine.json
index 8a9da33..67e6929 100644
--- a/examples/scala-parallel-similarproduct/recommended-user/engine.json
+++ b/examples/scala-parallel-similarproduct/recommended-user/engine.json
@@ -1,10 +1,10 @@
 {
   "id": "default",
   "description": "Default settings",
-  "engineFactory": "org.template.recommendeduser.RecommendedUserEngine",
+  "engineFactory": "org.apache.predictionio.examples.similarproduct.RecommendedUserEngine",
   "datasource": {
     "params" : {
-      "appId": 5
+      "appName": "MyApp1"
     }
   },
   "algorithms": [

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/recommended-user/project/assembly.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/recommended-user/project/assembly.sbt b/examples/scala-parallel-similarproduct/recommended-user/project/assembly.sbt
index 54c3252..e17409e 100644
--- a/examples/scala-parallel-similarproduct/recommended-user/project/assembly.sbt
+++ b/examples/scala-parallel-similarproduct/recommended-user/project/assembly.sbt
@@ -1 +1 @@
-addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")
+addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.4")

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/recommended-user/project/build.properties
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/recommended-user/project/build.properties b/examples/scala-parallel-similarproduct/recommended-user/project/build.properties
new file mode 100644
index 0000000..64317fd
--- /dev/null
+++ b/examples/scala-parallel-similarproduct/recommended-user/project/build.properties
@@ -0,0 +1 @@
+sbt.version=0.13.15

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/recommended-user/project/pio-build.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/recommended-user/project/pio-build.sbt b/examples/scala-parallel-similarproduct/recommended-user/project/pio-build.sbt
deleted file mode 100644
index 9aed0ee..0000000
--- a/examples/scala-parallel-similarproduct/recommended-user/project/pio-build.sbt
+++ /dev/null
@@ -1 +0,0 @@
-addSbtPlugin("org.apache.predictionio" % "pio-build" % "0.9.0")

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/recommended-user/src/main/scala/ALSAlgorithm.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/recommended-user/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-similarproduct/recommended-user/src/main/scala/ALSAlgorithm.scala
index c97418d..8e93824 100644
--- a/examples/scala-parallel-similarproduct/recommended-user/src/main/scala/ALSAlgorithm.scala
+++ b/examples/scala-parallel-similarproduct/recommended-user/src/main/scala/ALSAlgorithm.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.template.recommendeduser
+package org.apache.predictionio.examples.similarproduct
 
 import grizzled.slf4j.Logger
 import org.apache.predictionio.controller.{P2LAlgorithm, Params}
@@ -175,7 +175,7 @@ class ALSAlgorithm(val ap: ALSAlgorithmParams)
     val topScores = getTopN(filteredScore, query.num)(ord).toArray
 
     val similarUserScores = topScores.map { case (i, s) =>
-      new similarUserScore(
+      new SimilarUserScore(
         user = model.similarUserIntStringMap(i),
         score = s
       )

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/recommended-user/src/main/scala/DataSource.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/recommended-user/src/main/scala/DataSource.scala b/examples/scala-parallel-similarproduct/recommended-user/src/main/scala/DataSource.scala
index b067f0b..dfcb54c 100644
--- a/examples/scala-parallel-similarproduct/recommended-user/src/main/scala/DataSource.scala
+++ b/examples/scala-parallel-similarproduct/recommended-user/src/main/scala/DataSource.scala
@@ -15,15 +15,16 @@
  * limitations under the License.
  */
 
-package org.template.recommendeduser
+package org.apache.predictionio.examples.similarproduct
 
-import grizzled.slf4j.Logger
 import org.apache.predictionio.controller.{EmptyActualResult, EmptyEvaluationInfo, PDataSource, Params}
-import org.apache.predictionio.data.storage.Storage
+import org.apache.predictionio.data.store.PEventStore
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
 
-case class DataSourceParams(appId: Int) extends Params
+import grizzled.slf4j.Logger
+
+case class DataSourceParams(appName: String) extends Params
 
 class DataSource(val dsp: DataSourceParams)
   extends PDataSource[TrainingData,
@@ -33,11 +34,10 @@ class DataSource(val dsp: DataSourceParams)
 
   override
   def readTraining(sc: SparkContext): TrainingData = {
-    val eventsDb = Storage.getPEvents()
 
     // create a RDD of (entityID, User)
-    val usersRDD: RDD[(String, User)] = eventsDb.aggregateProperties(
-      appId = dsp.appId,
+    val usersRDD: RDD[(String, User)] = PEventStore.aggregateProperties(
+      appName = dsp.appName,
       entityType = "user"
     )(sc).map { case (entityId, properties) =>
       val user = try {
@@ -53,8 +53,8 @@ class DataSource(val dsp: DataSourceParams)
     }.cache()
 
     // get all "user" "follow" "followedUser" events
-    val followEventsRDD: RDD[FollowEvent] = eventsDb.find(
-      appId = dsp.appId,
+    val followEventsRDD: RDD[FollowEvent] = PEventStore.find(
+      appName = dsp.appName,
       entityType = Some("user"),
       eventNames = Some(List("follow")),
       // targetEntityType is optional field of an event.

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/recommended-user/src/main/scala/Engine.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/recommended-user/src/main/scala/Engine.scala b/examples/scala-parallel-similarproduct/recommended-user/src/main/scala/Engine.scala
index d99f12f..1e561c4 100644
--- a/examples/scala-parallel-similarproduct/recommended-user/src/main/scala/Engine.scala
+++ b/examples/scala-parallel-similarproduct/recommended-user/src/main/scala/Engine.scala
@@ -15,9 +15,9 @@
  * limitations under the License.
  */
 
-package org.template.recommendeduser
+package org.apache.predictionio.examples.similarproduct
 
-import org.apache.predictionio.controller.IEngineFactory
+import org.apache.predictionio.controller.EngineFactory
 import org.apache.predictionio.controller.Engine
 
 case class Query(
@@ -28,15 +28,17 @@ case class Query(
 )
 
 case class PredictedResult(
-  similarUserScores: Array[similarUserScore]
-)
+  similarUserScores: Array[SimilarUserScore]
+){
+  override def toString: String = similarUserScores.mkString(",")
+}
 
-case class similarUserScore(
+case class SimilarUserScore(
   user: String,
   score: Double
 )
 
-object RecommendedUserEngine extends IEngineFactory {
+object RecommendedUserEngine extends EngineFactory {
   def apply() = {
     new Engine(
       classOf[DataSource],

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/recommended-user/src/main/scala/Preparator.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/recommended-user/src/main/scala/Preparator.scala b/examples/scala-parallel-similarproduct/recommended-user/src/main/scala/Preparator.scala
index f8e3546..a687fc1 100644
--- a/examples/scala-parallel-similarproduct/recommended-user/src/main/scala/Preparator.scala
+++ b/examples/scala-parallel-similarproduct/recommended-user/src/main/scala/Preparator.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.template.recommendeduser
+package org.apache.predictionio.examples.similarproduct
 
 import org.apache.predictionio.controller.PPreparator
 import org.apache.spark.SparkContext

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/recommended-user/src/main/scala/Serving.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/recommended-user/src/main/scala/Serving.scala b/examples/scala-parallel-similarproduct/recommended-user/src/main/scala/Serving.scala
index af5bb57..91abca6 100644
--- a/examples/scala-parallel-similarproduct/recommended-user/src/main/scala/Serving.scala
+++ b/examples/scala-parallel-similarproduct/recommended-user/src/main/scala/Serving.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.template.recommendeduser
+package org.apache.predictionio.examples.similarproduct
 
 import org.apache.predictionio.controller.LServing
 

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/recommended-user/template.json
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/recommended-user/template.json b/examples/scala-parallel-similarproduct/recommended-user/template.json
index 932e603..d076ec5 100644
--- a/examples/scala-parallel-similarproduct/recommended-user/template.json
+++ b/examples/scala-parallel-similarproduct/recommended-user/template.json
@@ -1 +1 @@
-{"pio": {"version": { "min": "0.9.0" }}}
+{"pio": {"version": { "min": "0.10.0-incubating" }}}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/return-item-properties/.gitignore
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/return-item-properties/.gitignore b/examples/scala-parallel-similarproduct/return-item-properties/.gitignore
new file mode 100644
index 0000000..5dbe602
--- /dev/null
+++ b/examples/scala-parallel-similarproduct/return-item-properties/.gitignore
@@ -0,0 +1,4 @@
+manifest.json
+target/
+pio.log
+/pio.sbt
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/return-item-properties/build.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/return-item-properties/build.sbt b/examples/scala-parallel-similarproduct/return-item-properties/build.sbt
new file mode 100644
index 0000000..1daded6
--- /dev/null
+++ b/examples/scala-parallel-similarproduct/return-item-properties/build.sbt
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+name := "template-scala-parallel-similarproduct"
+
+organization := "org.apache.predictionio"
+scalaVersion := "2.11.8"
+libraryDependencies ++= Seq(
+  "org.apache.predictionio" %% "apache-predictionio-core" % "0.11.0-incubating" % "provided",
+  "org.apache.spark"        %% "spark-mllib"              % "2.1.1" % "provided")

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/return-item-properties/data/import_eventserver.py
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/return-item-properties/data/import_eventserver.py b/examples/scala-parallel-similarproduct/return-item-properties/data/import_eventserver.py
new file mode 100644
index 0000000..dd3f5e8
--- /dev/null
+++ b/examples/scala-parallel-similarproduct/return-item-properties/data/import_eventserver.py
@@ -0,0 +1,93 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Import sample data for similar product engine
+"""
+
+import predictionio
+import argparse
+import random
+
+SEED = 3
+
+def import_events(client):
+  random.seed(SEED)
+  count = 0
+  print(client.get_status())
+  print("Importing data...")
+
+  # generate 10 users, with user ids u1,u2,....,u10
+  user_ids = ["u%s" % i for i in range(1, 11)]
+  for user_id in user_ids:
+    print("Set user", user_id)
+    client.create_event(
+      event="$set",
+      entity_type="user",
+      entity_id=user_id
+    )
+    count += 1
+
+  # generate 50 items, with item ids i1,i2,....,i50
+  # random assign 1 to 4 categories among c1-c6 to items
+  categories = ["c%s" % i for i in range(1, 7)]
+  item_ids = ["i%s" % i for i in range(1, 51)]
+  for item_id in item_ids:
+    print("Set item", item_id)
+    client.create_event(
+      event="$set",
+      entity_type="item",
+      entity_id=item_id,
+      properties={
+        "categories" : random.sample(categories, random.randint(1, 4)),
+        "title": "title for movie " + item_id,
+        "date": 1935 + random.randint(1, 25),
+        "imdbUrl": "http://imdb.com/fake-url/" + item_id
+      }
+    )
+    count += 1
+
+  # each user randomly viewed 10 items
+  for user_id in user_ids:
+    for viewed_item in random.sample(item_ids, 10):
+      print("User", user_id ,"views item", viewed_item)
+      client.create_event(
+        event="view",
+        entity_type="user",
+        entity_id=user_id,
+        target_entity_type="item",
+        target_entity_id=viewed_item
+      )
+      count += 1
+
+  print("%s events are imported." % count)
+
+if __name__ == '__main__':
+  parser = argparse.ArgumentParser(
+    description="Import sample data for similar product engine")
+  parser.add_argument('--access_key', default='invald_access_key')
+  parser.add_argument('--url', default="http://localhost:7070")
+
+  args = parser.parse_args()
+  print(args)
+
+  client = predictionio.EventClient(
+    access_key=args.access_key,
+    url=args.url,
+    threads=5,
+    qsize=500)
+  import_events(client)

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/return-item-properties/data/send_query.py
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/return-item-properties/data/send_query.py b/examples/scala-parallel-similarproduct/return-item-properties/data/send_query.py
new file mode 100644
index 0000000..0a70f28
--- /dev/null
+++ b/examples/scala-parallel-similarproduct/return-item-properties/data/send_query.py
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Send sample query to prediction engine
+"""
+
+import predictionio
+engine_client = predictionio.EngineClient(url="http://localhost:8000")
+print(engine_client.send_query({"items": ["i1", "i3"], "num": 4}))

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/return-item-properties/engine-cooccurrence.json
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/return-item-properties/engine-cooccurrence.json b/examples/scala-parallel-similarproduct/return-item-properties/engine-cooccurrence.json
new file mode 100644
index 0000000..c31b88e
--- /dev/null
+++ b/examples/scala-parallel-similarproduct/return-item-properties/engine-cooccurrence.json
@@ -0,0 +1,18 @@
+{
+  "id": "default",
+  "description": "Default settings",
+  "engineFactory": "org.apache.predictionio.examples.similarproduct.SimilarProductEngine",
+  "datasource": {
+    "params" : {
+      "appName": "MyApp1"
+    }
+  },
+  "algorithms": [
+    {
+      "name": "cooccurrence",
+      "params": {
+        "n": 20
+      }
+    }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/return-item-properties/engine.json
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/return-item-properties/engine.json b/examples/scala-parallel-similarproduct/return-item-properties/engine.json
new file mode 100644
index 0000000..a652ec4
--- /dev/null
+++ b/examples/scala-parallel-similarproduct/return-item-properties/engine.json
@@ -0,0 +1,21 @@
+{
+  "id": "default",
+  "description": "Default settings",
+  "engineFactory": "org.apache.predictionio.examples.similarproduct.SimilarProductEngine",
+  "datasource": {
+    "params" : {
+      "appName": "MyApp1"
+    }
+  },
+  "algorithms": [
+    {
+      "name": "als",
+      "params": {
+        "rank": 10,
+        "numIterations" : 20,
+        "lambda": 0.01,
+        "seed": 3
+      }
+    }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/return-item-properties/project/assembly.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/return-item-properties/project/assembly.sbt b/examples/scala-parallel-similarproduct/return-item-properties/project/assembly.sbt
new file mode 100644
index 0000000..e17409e
--- /dev/null
+++ b/examples/scala-parallel-similarproduct/return-item-properties/project/assembly.sbt
@@ -0,0 +1 @@
+addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.4")

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/return-item-properties/project/build.properties
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/return-item-properties/project/build.properties b/examples/scala-parallel-similarproduct/return-item-properties/project/build.properties
new file mode 100644
index 0000000..64317fd
--- /dev/null
+++ b/examples/scala-parallel-similarproduct/return-item-properties/project/build.properties
@@ -0,0 +1 @@
+sbt.version=0.13.15

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/return-item-properties/src/main/scala/ALSAlgorithm.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/return-item-properties/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-similarproduct/return-item-properties/src/main/scala/ALSAlgorithm.scala
new file mode 100644
index 0000000..fabb098
--- /dev/null
+++ b/examples/scala-parallel-similarproduct/return-item-properties/src/main/scala/ALSAlgorithm.scala
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.similarproduct
+
+import org.apache.predictionio.controller.P2LAlgorithm
+import org.apache.predictionio.controller.Params
+import org.apache.predictionio.data.storage.BiMap
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.recommendation.ALS
+import org.apache.spark.mllib.recommendation.{Rating => MLlibRating}
+
+import grizzled.slf4j.Logger
+
+import scala.collection.mutable.PriorityQueue
+
+case class ALSAlgorithmParams(
+  rank: Int,
+  numIterations: Int,
+  lambda: Double,
+  seed: Option[Long]) extends Params
+
+class ALSModel(
+  val productFeatures: Map[Int, Array[Double]],
+  val itemStringIntMap: BiMap[String, Int],
+  val items: Map[Int, Item]
+) extends Serializable {
+
+  @transient lazy val itemIntStringMap = itemStringIntMap.inverse
+
+  override def toString = {
+    s" productFeatures: [${productFeatures.size}]" +
+    s"(${productFeatures.take(2).toList}...)" +
+    s" itemStringIntMap: [${itemStringIntMap.size}]" +
+    s"(${itemStringIntMap.take(2).toString}...)]" +
+    s" items: [${items.size}]" +
+    s"(${items.take(2).toString}...)]"
+  }
+}
+
+/**
+  * Use ALS to build item x feature matrix
+  */
+class ALSAlgorithm(val ap: ALSAlgorithmParams)
+  extends P2LAlgorithm[PreparedData, ALSModel, Query, PredictedResult] {
+
+  @transient lazy val logger = Logger[this.type]
+
+  def train(sc: SparkContext, data: PreparedData): ALSModel = {
+    require(!data.viewEvents.take(1).isEmpty,
+      s"viewEvents in PreparedData cannot be empty." +
+      " Please check if DataSource generates TrainingData" +
+      " and Preprator generates PreparedData correctly.")
+    require(!data.users.take(1).isEmpty,
+      s"users in PreparedData cannot be empty." +
+      " Please check if DataSource generates TrainingData" +
+      " and Preprator generates PreparedData correctly.")
+    require(!data.items.take(1).isEmpty,
+      s"items in PreparedData cannot be empty." +
+      " Please check if DataSource generates TrainingData" +
+      " and Preprator generates PreparedData correctly.")
+    // create User and item's String ID to integer index BiMap
+    val userStringIntMap = BiMap.stringInt(data.users.keys)
+    val itemStringIntMap = BiMap.stringInt(data.items.keys)
+
+    // collect Item as Map and convert ID to Int index
+    val items: Map[Int, Item] = data.items.map { case (id, item) =>
+      (itemStringIntMap(id), item)
+    }.collectAsMap.toMap
+
+    val mllibRatings = data.viewEvents
+      .map { r =>
+        // Convert user and item String IDs to Int index for MLlib
+        val uindex = userStringIntMap.getOrElse(r.user, -1)
+        val iindex = itemStringIntMap.getOrElse(r.item, -1)
+
+        if (uindex == -1)
+          logger.info(s"Couldn't convert nonexistent user ID ${r.user}"
+            + " to Int index.")
+
+        if (iindex == -1)
+          logger.info(s"Couldn't convert nonexistent item ID ${r.item}"
+            + " to Int index.")
+
+        ((uindex, iindex), 1)
+      }.filter { case ((u, i), v) =>
+        // keep events with valid user and item index
+        (u != -1) && (i != -1)
+      }.reduceByKey(_ + _) // aggregate all view events of same user-item pair
+      .map { case ((u, i), v) =>
+        // MLlibRating requires integer index for user and item
+        MLlibRating(u, i, v)
+      }
+      .cache()
+
+    // MLLib ALS cannot handle empty training data.
+    require(!mllibRatings.take(1).isEmpty,
+      s"mllibRatings cannot be empty." +
+      " Please check if your events contain valid user and item ID.")
+
+    // seed for MLlib ALS
+    val seed = ap.seed.getOrElse(System.nanoTime)
+
+    val m = ALS.trainImplicit(
+      ratings = mllibRatings,
+      rank = ap.rank,
+      iterations = ap.numIterations,
+      lambda = ap.lambda,
+      blocks = -1,
+      alpha = 1.0,
+      seed = seed)
+
+    new ALSModel(
+      productFeatures = m.productFeatures.collectAsMap.toMap,
+      itemStringIntMap = itemStringIntMap,
+      items = items
+    )
+  }
+
+  def predict(model: ALSModel, query: Query): PredictedResult = {
+
+    val productFeatures = model.productFeatures
+
+    // convert items to Int index
+    val queryList: Set[Int] = query.items.map(model.itemStringIntMap.get(_))
+      .flatten.toSet
+
+    val queryFeatures: Vector[Array[Double]] = queryList.toVector
+      // productFeatures may not contain the requested item
+      .map { item => productFeatures.get(item) }
+      .flatten
+
+    val whiteList: Option[Set[Int]] = query.whiteList.map( set =>
+      set.map(model.itemStringIntMap.get(_)).flatten
+    )
+    val blackList: Option[Set[Int]] = query.blackList.map ( set =>
+      set.map(model.itemStringIntMap.get(_)).flatten
+    )
+
+    val ord = Ordering.by[(Int, Double), Double](_._2).reverse
+
+    val indexScores: Array[(Int, Double)] = if (queryFeatures.isEmpty) {
+      logger.info(s"No productFeatures vector for query items ${query.items}.")
+      Array[(Int, Double)]()
+    } else {
+      productFeatures.par // convert to parallel collection
+        .mapValues { f =>
+          queryFeatures.map{ qf =>
+            cosine(qf, f)
+          }.reduce(_ + _)
+        }
+        .filter(_._2 > 0) // keep items with score > 0
+        .seq // convert back to sequential collection
+        .toArray
+    }
+
+    val filteredScore = indexScores.view.filter { case (i, v) =>
+      isCandidateItem(
+        i = i,
+        items = model.items,
+        categories = query.categories,
+        categoryBlackList = query.categoryBlackList,
+        queryList = queryList,
+        whiteList = whiteList,
+        blackList = blackList
+      )
+    }
+
+    val topScores = getTopN(filteredScore, query.num)(ord).toArray
+
+    val itemScores = topScores.map { case (i, s) =>
+      // MODIFIED
+      val it = model.items(i)
+      new ItemScore(
+        item = model.itemIntStringMap(i),
+        title = it.title,
+        date = it.date,
+        imdbUrl = it.imdbUrl,
+        score = s
+      )
+    }
+
+    new PredictedResult(itemScores)
+  }
+
+  private
+  def getTopN[T](s: Seq[T], n: Int)(implicit ord: Ordering[T]): Seq[T] = {
+
+    val q = PriorityQueue()
+
+    for (x <- s) {
+      if (q.size < n)
+        q.enqueue(x)
+      else {
+        // q is full
+        if (ord.compare(x, q.head) < 0) {
+          q.dequeue()
+          q.enqueue(x)
+        }
+      }
+    }
+
+    q.dequeueAll.toSeq.reverse
+  }
+
+  private
+  def cosine(v1: Array[Double], v2: Array[Double]): Double = {
+    val size = v1.size
+    var i = 0
+    var n1: Double = 0
+    var n2: Double = 0
+    var d: Double = 0
+    while (i < size) {
+      n1 += v1(i) * v1(i)
+      n2 += v2(i) * v2(i)
+      d += v1(i) * v2(i)
+      i += 1
+    }
+    val n1n2 = (math.sqrt(n1) * math.sqrt(n2))
+    if (n1n2 == 0) 0 else (d / n1n2)
+  }
+
+  private
+  def isCandidateItem(
+    i: Int,
+    items: Map[Int, Item],
+    categories: Option[Set[String]],
+    categoryBlackList: Option[Set[String]],
+    queryList: Set[Int],
+    whiteList: Option[Set[Int]],
+    blackList: Option[Set[Int]]
+  ): Boolean = {
+    whiteList.map(_.contains(i)).getOrElse(true) &&
+    blackList.map(!_.contains(i)).getOrElse(true) &&
+    // discard items in query as well
+    (!queryList.contains(i)) &&
+    // filter categories
+    categories.map { cat =>
+      items(i).categories.map { itemCat =>
+        // keep this item if has ovelap categories with the query
+        !(itemCat.toSet.intersect(cat).isEmpty)
+      }.getOrElse(false) // discard this item if it has no categories
+    }.getOrElse(true) &&
+    categoryBlackList.map { cat =>
+      items(i).categories.map { itemCat =>
+        // discard this item if has ovelap categories with the query
+        (itemCat.toSet.intersect(cat).isEmpty)
+      }.getOrElse(true) // keep this item if it has no categories
+    }.getOrElse(true)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/return-item-properties/src/main/scala/CooccurrenceAlgorithm.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/return-item-properties/src/main/scala/CooccurrenceAlgorithm.scala b/examples/scala-parallel-similarproduct/return-item-properties/src/main/scala/CooccurrenceAlgorithm.scala
new file mode 100644
index 0000000..7a75bf0
--- /dev/null
+++ b/examples/scala-parallel-similarproduct/return-item-properties/src/main/scala/CooccurrenceAlgorithm.scala
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.similarproduct
+
+import org.apache.predictionio.controller.P2LAlgorithm
+import org.apache.predictionio.controller.Params
+import org.apache.predictionio.data.storage.BiMap
+
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+
+case class CooccurrenceAlgorithmParams(
+  n: Int // top co-occurrence
+) extends Params
+
+class CooccurrenceModel(
+  val topCooccurrences: Map[Int, Array[(Int, Int)]],
+  val itemStringIntMap: BiMap[String, Int],
+  val items: Map[Int, Item]
+) extends Serializable {
+  @transient lazy val itemIntStringMap = itemStringIntMap.inverse
+
+  override def toString(): String = {
+    val s = topCooccurrences.mapValues { v => v.mkString(",") }
+    s.toString
+  }
+}
+
+class CooccurrenceAlgorithm(val ap: CooccurrenceAlgorithmParams)
+  extends P2LAlgorithm[PreparedData, CooccurrenceModel, Query, PredictedResult] {
+
+  def train(sc: SparkContext, data: PreparedData): CooccurrenceModel = {
+
+    val itemStringIntMap = BiMap.stringInt(data.items.keys)
+
+    val topCooccurrences = trainCooccurrence(
+      events = data.viewEvents,
+      n = ap.n,
+      itemStringIntMap = itemStringIntMap
+    )
+
+    // collect Item as Map and convert ID to Int index
+    val items: Map[Int, Item] = data.items.map { case (id, item) =>
+      (itemStringIntMap(id), item)
+    }.collectAsMap.toMap
+
+    new CooccurrenceModel(
+      topCooccurrences = topCooccurrences,
+      itemStringIntMap = itemStringIntMap,
+      items = items
+    )
+
+  }
+
+  /* given the user-item events, find out top n co-occurrence pair for each item */
+  def trainCooccurrence(
+    events: RDD[ViewEvent],
+    n: Int,
+    itemStringIntMap: BiMap[String, Int]): Map[Int, Array[(Int, Int)]] = {
+
+    val userItem = events
+      // map item from string to integer index
+      .flatMap {
+        case ViewEvent(user, item, _) if itemStringIntMap.contains(item) =>
+          Some(user, itemStringIntMap(item))
+        case _ => None
+      }
+      // if user view same item multiple times, only count as once
+      .distinct()
+      .cache()
+
+    val cooccurrences: RDD[((Int, Int), Int)] = userItem.join(userItem)
+      // remove duplicate pair in reversed order for each user. eg. (a,b) vs. (b,a)
+      .filter { case (user, (item1, item2)) => item1 < item2 }
+      .map { case (user, (item1, item2)) => ((item1, item2), 1) }
+      .reduceByKey{ (a: Int, b: Int) => a + b }
+
+    val topCooccurrences = cooccurrences
+      .flatMap{ case (pair, count) =>
+        Seq((pair._1, (pair._2, count)), (pair._2, (pair._1, count)))
+      }
+      .groupByKey
+      .map { case (item, itemCounts) =>
+        (item, itemCounts.toArray.sortBy(_._2)(Ordering.Int.reverse).take(n))
+      }
+      .collectAsMap.toMap
+
+    topCooccurrences
+  }
+
+  def predict(model: CooccurrenceModel, query: Query): PredictedResult = {
+
+    // convert items to Int index
+    val queryList: Set[Int] = query.items
+      .flatMap(model.itemStringIntMap.get(_))
+      .toSet
+
+    val whiteList: Option[Set[Int]] = query.whiteList.map( set =>
+      set.map(model.itemStringIntMap.get(_)).flatten
+    )
+
+    val blackList: Option[Set[Int]] = query.blackList.map ( set =>
+      set.map(model.itemStringIntMap.get(_)).flatten
+    )
+
+    val counts: Array[(Int, Int)] = queryList.toVector
+      .flatMap { q =>
+        model.topCooccurrences.getOrElse(q, Array())
+      }
+      .groupBy { case (index, count) => index }
+      .map { case (index, indexCounts) => (index, indexCounts.map(_._2).sum) }
+      .toArray
+
+    val itemScores = counts
+      .filter { case (i, v) =>
+        isCandidateItem(
+          i = i,
+          items = model.items,
+          categories = query.categories,
+          queryList = queryList,
+          whiteList = whiteList,
+          blackList = blackList
+        )
+      }
+      .sortBy(_._2)(Ordering.Int.reverse)
+      .take(query.num)
+      .map { case (index, count) =>
+        // MODIFIED
+        val it = model.items(index)
+        ItemScore(
+          item = model.itemIntStringMap(index),
+          title = it.title,
+          date = it.date,
+          imdbUrl = it.imdbUrl,
+          score = count
+        )
+      }
+
+    new PredictedResult(itemScores)
+
+  }
+
+  private
+  def isCandidateItem(
+    i: Int,
+    items: Map[Int, Item],
+    categories: Option[Set[String]],
+    queryList: Set[Int],
+    whiteList: Option[Set[Int]],
+    blackList: Option[Set[Int]]
+  ): Boolean = {
+    whiteList.map(_.contains(i)).getOrElse(true) &&
+    blackList.map(!_.contains(i)).getOrElse(true) &&
+    // discard items in query as well
+    (!queryList.contains(i)) &&
+    // filter categories
+    categories.map { cat =>
+      items(i).categories.map { itemCat =>
+        // keep this item if has ovelap categories with the query
+        !(itemCat.toSet.intersect(cat).isEmpty)
+      }.getOrElse(false) // discard this item if it has no categories
+    }.getOrElse(true)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/return-item-properties/src/main/scala/DataSource.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/return-item-properties/src/main/scala/DataSource.scala b/examples/scala-parallel-similarproduct/return-item-properties/src/main/scala/DataSource.scala
new file mode 100644
index 0000000..f227649
--- /dev/null
+++ b/examples/scala-parallel-similarproduct/return-item-properties/src/main/scala/DataSource.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.similarproduct
+
+import org.apache.predictionio.controller.PDataSource
+import org.apache.predictionio.controller.EmptyEvaluationInfo
+import org.apache.predictionio.controller.EmptyActualResult
+import org.apache.predictionio.controller.Params
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.store.PEventStore
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+import grizzled.slf4j.Logger
+
+case class DataSourceParams(appName: String) extends Params
+
+class DataSource(val dsp: DataSourceParams)
+  extends PDataSource[TrainingData,
+      EmptyEvaluationInfo, Query, EmptyActualResult] {
+
+  @transient lazy val logger = Logger[this.type]
+
+  override
+  def readTraining(sc: SparkContext): TrainingData = {
+
+    // create a RDD of (entityID, User)
+    val usersRDD: RDD[(String, User)] = PEventStore.aggregateProperties(
+      appName = dsp.appName,
+      entityType = "user"
+    )(sc).map { case (entityId, properties) =>
+      val user = try {
+        User()
+      } catch {
+        case e: Exception => {
+          logger.error(s"Failed to get properties ${properties} of" +
+            s" user ${entityId}. Exception: ${e}.")
+          throw e
+        }
+      }
+      (entityId, user)
+    }.cache()
+
+    // create a RDD of (entityID, Item)
+    val itemsRDD: RDD[(String, Item)] = PEventStore.aggregateProperties(
+      appName = dsp.appName,
+      entityType = "item"
+    )(sc).map { case (entityId, properties) =>
+      val item = try {
+        // Assume categories is optional property of item.
+        // MODIFIED
+        Item(
+          title = properties.get[String]("title"),
+          date = properties.get[String]("date"),
+          imdbUrl = properties.get[String]("imdbUrl"),
+          categories = properties.getOpt[List[String]]("categories"))
+      } catch {
+        case e: Exception => {
+          logger.error(s"Failed to get properties ${properties} of" +
+            s" item ${entityId}. Exception: ${e}.")
+          throw e
+        }
+      }
+      (entityId, item)
+    }.cache()
+
+    // get all "user" "view" "item" events
+    val viewEventsRDD: RDD[ViewEvent] = PEventStore.find(
+      appName = dsp.appName,
+      entityType = Some("user"),
+      eventNames = Some(List("view")),
+      // targetEntityType is optional field of an event.
+      targetEntityType = Some(Some("item")))(sc)
+      // eventsDb.find() returns RDD[Event]
+      .map { event =>
+        val viewEvent = try {
+          event.event match {
+            case "view" => ViewEvent(
+              user = event.entityId,
+              item = event.targetEntityId.get,
+              t = event.eventTime.getMillis)
+            case _ => throw new Exception(s"Unexpected event ${event} is read.")
+          }
+        } catch {
+          case e: Exception => {
+            logger.error(s"Cannot convert ${event} to ViewEvent." +
+              s" Exception: ${e}.")
+            throw e
+          }
+        }
+        viewEvent
+      }.cache()
+
+    new TrainingData(
+      users = usersRDD,
+      items = itemsRDD,
+      viewEvents = viewEventsRDD
+    )
+  }
+}
+
+case class User()
+
+// MODIFIED
+case class Item(
+     title: String,
+     date: String,
+     imdbUrl: String,
+     categories: Option[List[String]])
+
+case class ViewEvent(user: String, item: String, t: Long)
+
+class TrainingData(
+  val users: RDD[(String, User)],
+  val items: RDD[(String, Item)],
+  val viewEvents: RDD[ViewEvent]
+) extends Serializable {
+  override def toString = {
+    s"users: [${users.count()} (${users.take(2).toList}...)]" +
+    s"items: [${items.count()} (${items.take(2).toList}...)]" +
+    s"viewEvents: [${viewEvents.count()}] (${viewEvents.take(2).toList}...)"
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/return-item-properties/src/main/scala/Engine.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/return-item-properties/src/main/scala/Engine.scala b/examples/scala-parallel-similarproduct/return-item-properties/src/main/scala/Engine.scala
new file mode 100644
index 0000000..595383e
--- /dev/null
+++ b/examples/scala-parallel-similarproduct/return-item-properties/src/main/scala/Engine.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.similarproduct
+
+import org.apache.predictionio.controller.EngineFactory
+import org.apache.predictionio.controller.Engine
+
+case class Query(
+  items: List[String],
+  num: Int,
+  categories: Option[Set[String]],
+  categoryBlackList: Option[Set[String]],
+  whiteList: Option[Set[String]],
+  blackList: Option[Set[String]]
+) extends Serializable
+
+case class PredictedResult(
+  itemScores: Array[ItemScore]
+) extends Serializable {
+  override def toString: String = itemScores.mkString(",")
+}
+
+// MODIFIED
+case class ItemScore(
+  item: String,
+  title: String,
+  date: String,
+  imdbUrl: String,
+  score: Double
+) extends Serializable
+
+object SimilarProductEngine extends EngineFactory {
+  def apply() = {
+    new Engine(
+      classOf[DataSource],
+      classOf[Preparator],
+      Map(
+        "als" -> classOf[ALSAlgorithm],
+        "cooccurrence" -> classOf[CooccurrenceAlgorithm]),
+      classOf[Serving])
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/return-item-properties/src/main/scala/Preparator.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/return-item-properties/src/main/scala/Preparator.scala b/examples/scala-parallel-similarproduct/return-item-properties/src/main/scala/Preparator.scala
new file mode 100644
index 0000000..ece997b
--- /dev/null
+++ b/examples/scala-parallel-similarproduct/return-item-properties/src/main/scala/Preparator.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.similarproduct
+
+import org.apache.predictionio.controller.PPreparator
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+class Preparator
+  extends PPreparator[TrainingData, PreparedData] {
+
+  def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = {
+    new PreparedData(
+      users = trainingData.users,
+      items = trainingData.items,
+      viewEvents = trainingData.viewEvents)
+  }
+}
+
+class PreparedData(
+  val users: RDD[(String, User)],
+  val items: RDD[(String, Item)],
+  val viewEvents: RDD[ViewEvent]
+) extends Serializable

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/return-item-properties/src/main/scala/Serving.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/return-item-properties/src/main/scala/Serving.scala b/examples/scala-parallel-similarproduct/return-item-properties/src/main/scala/Serving.scala
new file mode 100644
index 0000000..91abca6
--- /dev/null
+++ b/examples/scala-parallel-similarproduct/return-item-properties/src/main/scala/Serving.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.similarproduct
+
+import org.apache.predictionio.controller.LServing
+
+class Serving
+  extends LServing[Query, PredictedResult] {
+
+  override
+  def serve(query: Query,
+    predictedResults: Seq[PredictedResult]): PredictedResult = {
+    predictedResults.head
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/return-item-properties/template.json
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/return-item-properties/template.json b/examples/scala-parallel-similarproduct/return-item-properties/template.json
new file mode 100644
index 0000000..d076ec5
--- /dev/null
+++ b/examples/scala-parallel-similarproduct/return-item-properties/template.json
@@ -0,0 +1 @@
+{"pio": {"version": { "min": "0.10.0-incubating" }}}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/rid-user-set-event/.gitignore
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/rid-user-set-event/.gitignore b/examples/scala-parallel-similarproduct/rid-user-set-event/.gitignore
new file mode 100644
index 0000000..5dbe602
--- /dev/null
+++ b/examples/scala-parallel-similarproduct/rid-user-set-event/.gitignore
@@ -0,0 +1,4 @@
+manifest.json
+target/
+pio.log
+/pio.sbt
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/rid-user-set-event/build.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/rid-user-set-event/build.sbt b/examples/scala-parallel-similarproduct/rid-user-set-event/build.sbt
new file mode 100644
index 0000000..1daded6
--- /dev/null
+++ b/examples/scala-parallel-similarproduct/rid-user-set-event/build.sbt
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+name := "template-scala-parallel-similarproduct"
+
+organization := "org.apache.predictionio"
+scalaVersion := "2.11.8"
+libraryDependencies ++= Seq(
+  "org.apache.predictionio" %% "apache-predictionio-core" % "0.11.0-incubating" % "provided",
+  "org.apache.spark"        %% "spark-mllib"              % "2.1.1" % "provided")

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/rid-user-set-event/data/import_eventserver.py
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/rid-user-set-event/data/import_eventserver.py b/examples/scala-parallel-similarproduct/rid-user-set-event/data/import_eventserver.py
new file mode 100644
index 0000000..903dbd5
--- /dev/null
+++ b/examples/scala-parallel-similarproduct/rid-user-set-event/data/import_eventserver.py
@@ -0,0 +1,82 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Import sample data for similar product engine
+"""
+
+import predictionio
+import argparse
+import random
+
+SEED = 3
+
+def import_events(client):
+  random.seed(SEED)
+  count = 0
+  print(client.get_status())
+  print("Importing data...")
+
+  # generate 10 users, with user ids u1,u2,....,u10
+  user_ids = ["u%s" % i for i in range(1, 11)]
+
+  # generate 50 items, with item ids i1,i2,....,i50
+  # random assign 1 to 4 categories among c1-c6 to items
+  categories = ["c%s" % i for i in range(1, 7)]
+  item_ids = ["i%s" % i for i in range(1, 51)]
+  for item_id in item_ids:
+    print("Set item", item_id)
+    client.create_event(
+      event="$set",
+      entity_type="item",
+      entity_id=item_id,
+      properties={
+        "categories" : random.sample(categories, random.randint(1, 4))
+      }
+    )
+    count += 1
+
+  # each user randomly viewed 10 items
+  for user_id in user_ids:
+    for viewed_item in random.sample(item_ids, 10):
+      print("User", user_id ,"views item", viewed_item)
+      client.create_event(
+        event="view",
+        entity_type="user",
+        entity_id=user_id,
+        target_entity_type="item",
+        target_entity_id=viewed_item
+      )
+      count += 1
+
+  print("%s events are imported." % count)
+
+if __name__ == '__main__':
+  parser = argparse.ArgumentParser(
+    description="Import sample data for similar product engine")
+  parser.add_argument('--access_key', default='invald_access_key')
+  parser.add_argument('--url', default="http://localhost:7070")
+
+  args = parser.parse_args()
+  print(args)
+
+  client = predictionio.EventClient(
+    access_key=args.access_key,
+    url=args.url,
+    threads=5,
+    qsize=500)
+  import_events(client)

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/rid-user-set-event/data/send_query.py
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/rid-user-set-event/data/send_query.py b/examples/scala-parallel-similarproduct/rid-user-set-event/data/send_query.py
new file mode 100644
index 0000000..0a70f28
--- /dev/null
+++ b/examples/scala-parallel-similarproduct/rid-user-set-event/data/send_query.py
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Send sample query to prediction engine
+"""
+
+import predictionio
+engine_client = predictionio.EngineClient(url="http://localhost:8000")
+print(engine_client.send_query({"items": ["i1", "i3"], "num": 4}))

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/rid-user-set-event/engine-cooccurrence.json
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/rid-user-set-event/engine-cooccurrence.json b/examples/scala-parallel-similarproduct/rid-user-set-event/engine-cooccurrence.json
new file mode 100644
index 0000000..c31b88e
--- /dev/null
+++ b/examples/scala-parallel-similarproduct/rid-user-set-event/engine-cooccurrence.json
@@ -0,0 +1,18 @@
+{
+  "id": "default",
+  "description": "Default settings",
+  "engineFactory": "org.apache.predictionio.examples.similarproduct.SimilarProductEngine",
+  "datasource": {
+    "params" : {
+      "appName": "MyApp1"
+    }
+  },
+  "algorithms": [
+    {
+      "name": "cooccurrence",
+      "params": {
+        "n": 20
+      }
+    }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/rid-user-set-event/engine.json
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/rid-user-set-event/engine.json b/examples/scala-parallel-similarproduct/rid-user-set-event/engine.json
new file mode 100644
index 0000000..a652ec4
--- /dev/null
+++ b/examples/scala-parallel-similarproduct/rid-user-set-event/engine.json
@@ -0,0 +1,21 @@
+{
+  "id": "default",
+  "description": "Default settings",
+  "engineFactory": "org.apache.predictionio.examples.similarproduct.SimilarProductEngine",
+  "datasource": {
+    "params" : {
+      "appName": "MyApp1"
+    }
+  },
+  "algorithms": [
+    {
+      "name": "als",
+      "params": {
+        "rank": 10,
+        "numIterations" : 20,
+        "lambda": 0.01,
+        "seed": 3
+      }
+    }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/rid-user-set-event/project/assembly.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/rid-user-set-event/project/assembly.sbt b/examples/scala-parallel-similarproduct/rid-user-set-event/project/assembly.sbt
new file mode 100644
index 0000000..e17409e
--- /dev/null
+++ b/examples/scala-parallel-similarproduct/rid-user-set-event/project/assembly.sbt
@@ -0,0 +1 @@
+addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.4")

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/rid-user-set-event/project/build.properties
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/rid-user-set-event/project/build.properties b/examples/scala-parallel-similarproduct/rid-user-set-event/project/build.properties
new file mode 100644
index 0000000..64317fd
--- /dev/null
+++ b/examples/scala-parallel-similarproduct/rid-user-set-event/project/build.properties
@@ -0,0 +1 @@
+sbt.version=0.13.15

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/rid-user-set-event/src/main/scala/ALSAlgorithm.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/rid-user-set-event/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-similarproduct/rid-user-set-event/src/main/scala/ALSAlgorithm.scala
new file mode 100644
index 0000000..b413c08
--- /dev/null
+++ b/examples/scala-parallel-similarproduct/rid-user-set-event/src/main/scala/ALSAlgorithm.scala
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.similarproduct
+
+import org.apache.predictionio.controller.P2LAlgorithm
+import org.apache.predictionio.controller.Params
+import org.apache.predictionio.data.storage.BiMap
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.recommendation.ALS
+import org.apache.spark.mllib.recommendation.{Rating => MLlibRating}
+
+import grizzled.slf4j.Logger
+
+import scala.collection.mutable.PriorityQueue
+
+case class ALSAlgorithmParams(
+  rank: Int,
+  numIterations: Int,
+  lambda: Double,
+  seed: Option[Long]) extends Params
+
+class ALSModel(
+  val productFeatures: Map[Int, Array[Double]],
+  val itemStringIntMap: BiMap[String, Int],
+  val items: Map[Int, Item]
+) extends Serializable {
+
+  @transient lazy val itemIntStringMap = itemStringIntMap.inverse
+
+  override def toString = {
+    s" productFeatures: [${productFeatures.size}]" +
+    s"(${productFeatures.take(2).toList}...)" +
+    s" itemStringIntMap: [${itemStringIntMap.size}]" +
+    s"(${itemStringIntMap.take(2).toString}...)]" +
+    s" items: [${items.size}]" +
+    s"(${items.take(2).toString}...)]"
+  }
+}
+
+/**
+  * Use ALS to build item x feature matrix
+  */
+class ALSAlgorithm(val ap: ALSAlgorithmParams)
+  extends P2LAlgorithm[PreparedData, ALSModel, Query, PredictedResult] {
+
+  @transient lazy val logger = Logger[this.type]
+
+  def train(sc: SparkContext, data: PreparedData): ALSModel = {
+    require(!data.viewEvents.take(1).isEmpty,
+      s"viewEvents in PreparedData cannot be empty." +
+      " Please check if DataSource generates TrainingData" +
+      " and Preprator generates PreparedData correctly.")
+    require(!data.items.take(1).isEmpty,
+      s"items in PreparedData cannot be empty." +
+      " Please check if DataSource generates TrainingData" +
+      " and Preprator generates PreparedData correctly.")
+    // create User and item's String ID to integer index BiMap
+    val userStringIntMap = BiMap.stringInt(data.viewEvents.map(_.user)) // MODIFIED
+    val itemStringIntMap = BiMap.stringInt(data.items.keys)
+
+    // collect Item as Map and convert ID to Int index
+    val items: Map[Int, Item] = data.items.map { case (id, item) =>
+      (itemStringIntMap(id), item)
+    }.collectAsMap.toMap
+
+    val mllibRatings = data.viewEvents
+      .map { r =>
+        // Convert user and item String IDs to Int index for MLlib
+        val uindex = userStringIntMap.getOrElse(r.user, -1)
+        val iindex = itemStringIntMap.getOrElse(r.item, -1)
+
+        if (uindex == -1)
+          logger.info(s"Couldn't convert nonexistent user ID ${r.user}"
+            + " to Int index.")
+
+        if (iindex == -1)
+          logger.info(s"Couldn't convert nonexistent item ID ${r.item}"
+            + " to Int index.")
+
+        ((uindex, iindex), 1)
+      }.filter { case ((u, i), v) =>
+        // keep events with valid user and item index
+        (u != -1) && (i != -1)
+      }.reduceByKey(_ + _) // aggregate all view events of same user-item pair
+      .map { case ((u, i), v) =>
+        // MLlibRating requires integer index for user and item
+        MLlibRating(u, i, v)
+      }
+      .cache()
+
+    // MLLib ALS cannot handle empty training data.
+    require(!mllibRatings.take(1).isEmpty,
+      s"mllibRatings cannot be empty." +
+      " Please check if your events contain valid user and item ID.")
+
+    // seed for MLlib ALS
+    val seed = ap.seed.getOrElse(System.nanoTime)
+
+    val m = ALS.trainImplicit(
+      ratings = mllibRatings,
+      rank = ap.rank,
+      iterations = ap.numIterations,
+      lambda = ap.lambda,
+      blocks = -1,
+      alpha = 1.0,
+      seed = seed)
+
+    new ALSModel(
+      productFeatures = m.productFeatures.collectAsMap.toMap,
+      itemStringIntMap = itemStringIntMap,
+      items = items
+    )
+  }
+
+  def predict(model: ALSModel, query: Query): PredictedResult = {
+
+    val productFeatures = model.productFeatures
+
+    // convert items to Int index
+    val queryList: Set[Int] = query.items.map(model.itemStringIntMap.get(_))
+      .flatten.toSet
+
+    val queryFeatures: Vector[Array[Double]] = queryList.toVector
+      // productFeatures may not contain the requested item
+      .map { item => productFeatures.get(item) }
+      .flatten
+
+    val whiteList: Option[Set[Int]] = query.whiteList.map( set =>
+      set.map(model.itemStringIntMap.get(_)).flatten
+    )
+    val blackList: Option[Set[Int]] = query.blackList.map ( set =>
+      set.map(model.itemStringIntMap.get(_)).flatten
+    )
+
+    val ord = Ordering.by[(Int, Double), Double](_._2).reverse
+
+    val indexScores: Array[(Int, Double)] = if (queryFeatures.isEmpty) {
+      logger.info(s"No productFeatures vector for query items ${query.items}.")
+      Array[(Int, Double)]()
+    } else {
+      productFeatures.par // convert to parallel collection
+        .mapValues { f =>
+          queryFeatures.map{ qf =>
+            cosine(qf, f)
+          }.reduce(_ + _)
+        }
+        .filter(_._2 > 0) // keep items with score > 0
+        .seq // convert back to sequential collection
+        .toArray
+    }
+
+    val filteredScore = indexScores.view.filter { case (i, v) =>
+      isCandidateItem(
+        i = i,
+        items = model.items,
+        categories = query.categories,
+        categoryBlackList = query.categoryBlackList,
+        queryList = queryList,
+        whiteList = whiteList,
+        blackList = blackList
+      )
+    }
+
+    val topScores = getTopN(filteredScore, query.num)(ord).toArray
+
+    val itemScores = topScores.map { case (i, s) =>
+      new ItemScore(
+        item = model.itemIntStringMap(i),
+        score = s
+      )
+    }
+
+    new PredictedResult(itemScores)
+  }
+
+  private
+  def getTopN[T](s: Seq[T], n: Int)(implicit ord: Ordering[T]): Seq[T] = {
+
+    val q = PriorityQueue()
+
+    for (x <- s) {
+      if (q.size < n)
+        q.enqueue(x)
+      else {
+        // q is full
+        if (ord.compare(x, q.head) < 0) {
+          q.dequeue()
+          q.enqueue(x)
+        }
+      }
+    }
+
+    q.dequeueAll.toSeq.reverse
+  }
+
+  private
+  def cosine(v1: Array[Double], v2: Array[Double]): Double = {
+    val size = v1.size
+    var i = 0
+    var n1: Double = 0
+    var n2: Double = 0
+    var d: Double = 0
+    while (i < size) {
+      n1 += v1(i) * v1(i)
+      n2 += v2(i) * v2(i)
+      d += v1(i) * v2(i)
+      i += 1
+    }
+    val n1n2 = (math.sqrt(n1) * math.sqrt(n2))
+    if (n1n2 == 0) 0 else (d / n1n2)
+  }
+
+  private
+  def isCandidateItem(
+    i: Int,
+    items: Map[Int, Item],
+    categories: Option[Set[String]],
+    categoryBlackList: Option[Set[String]],
+    queryList: Set[Int],
+    whiteList: Option[Set[Int]],
+    blackList: Option[Set[Int]]
+  ): Boolean = {
+    whiteList.map(_.contains(i)).getOrElse(true) &&
+    blackList.map(!_.contains(i)).getOrElse(true) &&
+    // discard items in query as well
+    (!queryList.contains(i)) &&
+    // filter categories
+    categories.map { cat =>
+      items(i).categories.map { itemCat =>
+        // keep this item if has ovelap categories with the query
+        !(itemCat.toSet.intersect(cat).isEmpty)
+      }.getOrElse(false) // discard this item if it has no categories
+    }.getOrElse(true) &&
+    categoryBlackList.map { cat =>
+      items(i).categories.map { itemCat =>
+        // discard this item if has ovelap categories with the query
+        (itemCat.toSet.intersect(cat).isEmpty)
+      }.getOrElse(true) // keep this item if it has no categories
+    }.getOrElse(true)
+  }
+
+}