You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by sh...@apache.org on 2017/07/10 04:10:06 UTC

[03/11] incubator-predictionio git commit: [PIO-97] Fixes examples of the official templates for v0.11.0-incubating.

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/DataSource.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/DataSource.scala b/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/DataSource.scala
new file mode 100644
index 0000000..44834b3
--- /dev/null
+++ b/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/DataSource.scala
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.similarproduct
+
+import org.apache.predictionio.controller.PDataSource
+import org.apache.predictionio.controller.EmptyEvaluationInfo
+import org.apache.predictionio.controller.EmptyActualResult
+import org.apache.predictionio.controller.Params
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.store.PEventStore
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+import grizzled.slf4j.Logger
+
+case class DataSourceParams(appName: String) extends Params
+
+class DataSource(val dsp: DataSourceParams)
+  extends PDataSource[TrainingData,
+      EmptyEvaluationInfo, Query, EmptyActualResult] {
+
+  @transient lazy val logger = Logger[this.type]
+
+  override
+  def readTraining(sc: SparkContext): TrainingData = {
+
+    // create a RDD of (entityID, User)
+    val usersRDD: RDD[(String, User)] = PEventStore.aggregateProperties(
+      appName = dsp.appName,
+      entityType = "user"
+    )(sc).map { case (entityId, properties) =>
+      val user = try {
+        User()
+      } catch {
+        case e: Exception => {
+          logger.error(s"Failed to get properties ${properties} of" +
+            s" user ${entityId}. Exception: ${e}.")
+          throw e
+        }
+      }
+      (entityId, user)
+    }.cache()
+
+    // create a RDD of (entityID, Item)
+    val itemsRDD: RDD[(String, Item)] = PEventStore.aggregateProperties(
+      appName = dsp.appName,
+      entityType = "item"
+    )(sc).map { case (entityId, properties) =>
+      val item = try {
+        // Assume categories is optional property of item.
+        Item(categories = properties.getOpt[List[String]]("categories"))
+      } catch {
+        case e: Exception => {
+          logger.error(s"Failed to get properties ${properties} of" +
+            s" item ${entityId}. Exception: ${e}.")
+          throw e
+        }
+      }
+      (entityId, item)
+    }.cache()
+
+    // get all "user" "view" "item" events
+    val viewEventsRDD: RDD[ViewEvent] = PEventStore.find(
+      appName = dsp.appName,
+      entityType = Some("user"),
+      eventNames = Some(List("view")),
+      // targetEntityType is optional field of an event.
+      targetEntityType = Some(Some("item")))(sc)
+      // eventsDb.find() returns RDD[Event]
+      .map { event =>
+        val viewEvent = try {
+          event.event match {
+            case "view" => ViewEvent(
+              user = event.entityId,
+              item = event.targetEntityId.get,
+              t = event.eventTime.getMillis)
+            case _ => throw new Exception(s"Unexpected event ${event} is read.")
+          }
+        } catch {
+          case e: Exception => {
+            logger.error(s"Cannot convert ${event} to ViewEvent." +
+              s" Exception: ${e}.")
+            throw e
+          }
+        }
+        viewEvent
+      }.cache()
+
+    // ADDED
+    // get all "user" "like" and "dislike" "item" events
+    val likeEventsRDD: RDD[LikeEvent] = PEventStore.find(
+      appName = dsp.appName,
+      entityType = Some("user"),
+      eventNames = Some(List("like", "dislike")),
+      // targetEntityType is optional field of an event.
+      targetEntityType = Some(Some("item")))(sc)
+      // eventsDb.find() returns RDD[Event]
+      .map { event =>
+        val likeEvent = try {
+          event.event match {
+            case "like" | "dislike" => LikeEvent(
+              user = event.entityId,
+              item = event.targetEntityId.get,
+              t = event.eventTime.getMillis,
+              like = (event.event == "like"))
+            case _ => throw new Exception(s"Unexpected event ${event} is read.")
+          }
+        } catch {
+          case e: Exception => {
+            logger.error(s"Cannot convert ${event} to LikeEvent." +
+              s" Exception: ${e}.")
+            throw e
+          }
+        }
+        likeEvent
+      }.cache()
+
+    new TrainingData(
+      users = usersRDD,
+      items = itemsRDD,
+      viewEvents = viewEventsRDD,
+      likeEvents = likeEventsRDD // ADDED
+    )
+  }
+}
+
+case class User()
+
+case class Item(categories: Option[List[String]])
+
+case class ViewEvent(user: String, item: String, t: Long)
+
+case class LikeEvent( // ADDED
+  user: String,
+  item: String,
+  t: Long,
+  like: Boolean // true: like. false: dislike
+)
+
+class TrainingData(
+  val users: RDD[(String, User)],
+  val items: RDD[(String, Item)],
+  val viewEvents: RDD[ViewEvent],
+  val likeEvents: RDD[LikeEvent] // ADDED
+) extends Serializable {
+  override def toString = {
+    s"users: [${users.count()} (${users.take(2).toList}...)]" +
+    s"items: [${items.count()} (${items.take(2).toList}...)]" +
+    s"viewEvents: [${viewEvents.count()}] (${viewEvents.take(2).toList}...)" +
+    // ADDED
+    s"likeEvents: [${likeEvents.count()}] (${likeEvents.take(2).toList}...)"
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/Engine.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/Engine.scala b/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/Engine.scala
new file mode 100644
index 0000000..dcfee6f
--- /dev/null
+++ b/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/Engine.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.similarproduct
+
+import org.apache.predictionio.controller.EngineFactory
+import org.apache.predictionio.controller.Engine
+
+case class Query(
+  items: List[String],
+  num: Int,
+  categories: Option[Set[String]],
+  categoryBlackList: Option[Set[String]],
+  whiteList: Option[Set[String]],
+  blackList: Option[Set[String]]
+) extends Serializable
+
+case class PredictedResult(
+  itemScores: Array[ItemScore]
+) extends Serializable {
+  override def toString: String = itemScores.mkString(",")
+}
+
+case class ItemScore(
+  item: String,
+  score: Double
+) extends Serializable
+
+object SimilarProductEngine extends EngineFactory {
+  def apply() = {
+    new Engine(
+      classOf[DataSource],
+      classOf[Preparator],
+      Map(
+        "als" -> classOf[ALSAlgorithm],
+        "cooccurrence" -> classOf[CooccurrenceAlgorithm],
+        "likealgo" -> classOf[LikeAlgorithm]), // ADDED
+      classOf[Serving])
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/LikeAlgorithm.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/LikeAlgorithm.scala b/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/LikeAlgorithm.scala
new file mode 100644
index 0000000..527d9c2
--- /dev/null
+++ b/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/LikeAlgorithm.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.similarproduct
+
+import org.apache.predictionio.data.storage.BiMap
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.recommendation.ALS
+import org.apache.spark.mllib.recommendation.{Rating => MLlibRating}
+
+import grizzled.slf4j.Logger
+
+// ADDED
+// Extend original ALSAlgorithm and override train() function to handle
+// like and dislike events
+class LikeAlgorithm(ap: ALSAlgorithmParams) extends ALSAlgorithm(ap) {
+
+  @transient lazy override val logger = Logger[this.type]
+
+  override
+  def train(sc: SparkContext, data: PreparedData): ALSModel = {
+    require(!data.likeEvents.take(1).isEmpty,
+      s"likeEvents in PreparedData cannot be empty." +
+      " Please check if DataSource generates TrainingData" +
+      " and Preprator generates PreparedData correctly.")
+    require(!data.users.take(1).isEmpty,
+      s"users in PreparedData cannot be empty." +
+      " Please check if DataSource generates TrainingData" +
+      " and Preprator generates PreparedData correctly.")
+    require(!data.items.take(1).isEmpty,
+      s"items in PreparedData cannot be empty." +
+      " Please check if DataSource generates TrainingData" +
+      " and Preprator generates PreparedData correctly.")
+    // create User and item's String ID to integer index BiMap
+    val userStringIntMap = BiMap.stringInt(data.users.keys)
+    val itemStringIntMap = BiMap.stringInt(data.items.keys)
+
+    // collect Item as Map and convert ID to Int index
+    val items: Map[Int, Item] = data.items.map { case (id, item) =>
+      (itemStringIntMap(id), item)
+    }.collectAsMap.toMap
+
+    val mllibRatings = data.likeEvents
+      .map { r =>
+        // Convert user and item String IDs to Int index for MLlib
+        val uindex = userStringIntMap.getOrElse(r.user, -1)
+        val iindex = itemStringIntMap.getOrElse(r.item, -1)
+
+        if (uindex == -1)
+          logger.info(s"Couldn't convert nonexistent user ID ${r.user}"
+            + " to Int index.")
+
+        if (iindex == -1)
+          logger.info(s"Couldn't convert nonexistent item ID ${r.item}"
+            + " to Int index.")
+
+        // key is (uindex, iindex) tuple, value is (like, t) tuple
+        ((uindex, iindex), (r.like, r.t))
+      }.filter { case ((u, i), v) =>
+        // keep events with valid user and item index
+        (u != -1) && (i != -1)
+      }.reduceByKey { case (v1, v2) => // MODIFIED
+        // An user may like an item and change to dislike it later,
+        // or vice versa. Use the latest value for this case.
+        val (like1, t1) = v1
+        val (like2, t2) = v2
+        // keep the latest value
+        if (t1 > t2) v1 else v2
+      }.map { case ((u, i), (like, t)) => // MODIFIED
+        // With ALS.trainImplicit(), we can use negative value to indicate
+        // nagative siginal (ie. dislike)
+        val r = if (like) 1 else -1
+        // MLlibRating requires integer index for user and item
+        MLlibRating(u, i, r)
+      }
+      .cache()
+
+    // MLLib ALS cannot handle empty training data.
+    require(!mllibRatings.take(1).isEmpty,
+      s"mllibRatings cannot be empty." +
+      " Please check if your events contain valid user and item ID.")
+    // seed for MLlib ALS
+    val seed = ap.seed.getOrElse(System.nanoTime)
+
+    val m = ALS.trainImplicit(
+      ratings = mllibRatings,
+      rank = ap.rank,
+      iterations = ap.numIterations,
+      lambda = ap.lambda,
+      blocks = -1,
+      alpha = 1.0,
+      seed = seed)
+
+    new ALSModel(
+      productFeatures = m.productFeatures.collectAsMap.toMap,
+      itemStringIntMap = itemStringIntMap,
+      items = items
+    )
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/Preparator.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/Preparator.scala b/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/Preparator.scala
new file mode 100644
index 0000000..f2e3fa5
--- /dev/null
+++ b/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/Preparator.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.similarproduct
+
+import org.apache.predictionio.controller.PPreparator
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+class Preparator
+  extends PPreparator[TrainingData, PreparedData] {
+
+  def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = {
+    new PreparedData(
+      users = trainingData.users,
+      items = trainingData.items,
+      viewEvents = trainingData.viewEvents,
+      likeEvents = trainingData.likeEvents) // ADDED
+  }
+}
+
+class PreparedData(
+  val users: RDD[(String, User)],
+  val items: RDD[(String, Item)],
+  val viewEvents: RDD[ViewEvent],
+  val likeEvents: RDD[LikeEvent] // ADDED
+) extends Serializable

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/Serving.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/Serving.scala b/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/Serving.scala
new file mode 100644
index 0000000..79b33a2
--- /dev/null
+++ b/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/Serving.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.similarproduct
+
+import org.apache.predictionio.controller.LServing
+
+import breeze.stats.meanAndVariance
+import breeze.stats.MeanAndVariance
+
+class Serving
+  extends LServing[Query, PredictedResult] {
+
+  override
+  def serve(query: Query,
+    predictedResults: Seq[PredictedResult]): PredictedResult = {
+
+    // MODFIED
+    val standard: Seq[Array[ItemScore]] = if (query.num == 1) {
+      // if query 1 item, don't standardize
+      predictedResults.map(_.itemScores)
+    } else {
+      // Standardize the score before combine
+      val mvList: Seq[MeanAndVariance] = predictedResults.map { pr =>
+        meanAndVariance(pr.itemScores.map(_.score))
+      }
+
+      predictedResults.zipWithIndex
+        .map { case (pr, i) =>
+          pr.itemScores.map { is =>
+            // standardize score (z-score)
+            // if standard deviation is 0 (when all items have the same score,
+            // meaning all items are ranked equally), return 0.
+            val score = if (mvList(i).stdDev == 0) {
+              0
+            } else {
+              (is.score - mvList(i).mean) / mvList(i).stdDev
+            }
+
+            ItemScore(is.item, score)
+          }
+        }
+    }
+
+    // sum the standardized score if same item
+    val combined = standard.flatten // Array of ItemScore
+      .groupBy(_.item) // groupBy item id
+      .mapValues(itemScores => itemScores.map(_.score).reduce(_ + _))
+      .toArray // array of (item id, score)
+      .sortBy(_._2)(Ordering.Double.reverse)
+      .take(query.num)
+      .map { case (k,v) => ItemScore(k, v) }
+
+    new PredictedResult(combined)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/multi-events-multi-algos/template.json
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/multi-events-multi-algos/template.json b/examples/scala-parallel-similarproduct/multi-events-multi-algos/template.json
new file mode 100644
index 0000000..d076ec5
--- /dev/null
+++ b/examples/scala-parallel-similarproduct/multi-events-multi-algos/template.json
@@ -0,0 +1 @@
+{"pio": {"version": { "min": "0.10.0-incubating" }}}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/multi/.gitignore
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/multi/.gitignore b/examples/scala-parallel-similarproduct/multi/.gitignore
deleted file mode 100644
index 37f9402..0000000
--- a/examples/scala-parallel-similarproduct/multi/.gitignore
+++ /dev/null
@@ -1,5 +0,0 @@
-data/sample_movielens_data.txt
-manifest.json
-pio.log
-/pio.sbt
-target/

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/multi/README.md
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/multi/README.md b/examples/scala-parallel-similarproduct/multi/README.md
deleted file mode 100644
index ea2d28a..0000000
--- a/examples/scala-parallel-similarproduct/multi/README.md
+++ /dev/null
@@ -1,20 +0,0 @@
-<!--
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
--->
-
-This is based on Similar Product Template v0.1.0
-
-Please refer to http://predictionio.incubator.apache.org/templates/similarproduct/multi-events-multi-algos/

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/multi/build.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/multi/build.sbt b/examples/scala-parallel-similarproduct/multi/build.sbt
deleted file mode 100644
index da8b97a..0000000
--- a/examples/scala-parallel-similarproduct/multi/build.sbt
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import AssemblyKeys._
-
-assemblySettings
-
-name := "template-scala-parallel-similarproduct-multi"
-
-organization := "org.apache.predictionio"
-
-libraryDependencies ++= Seq(
-  "org.apache.predictionio"    %% "core"          % pioVersion.value % "provided",
-  "org.apache.spark" %% "spark-core"    % "1.2.0" % "provided",
-  "org.apache.spark" %% "spark-mllib"   % "1.2.0" % "provided")

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/multi/data/import_eventserver.py
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/multi/data/import_eventserver.py b/examples/scala-parallel-similarproduct/multi/data/import_eventserver.py
deleted file mode 100644
index 65aeae7..0000000
--- a/examples/scala-parallel-similarproduct/multi/data/import_eventserver.py
+++ /dev/null
@@ -1,113 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""
-Import sample data for similar product engine
-"""
-
-import predictionio
-import argparse
-import random
-
-SEED = 3
-
-def import_events(client):
-  random.seed(SEED)
-  count = 0
-  print client.get_status()
-  print "Importing data..."
-
-  # generate 10 users, with user ids u1,u2,....,u10
-  user_ids = ["u%s" % i for i in range(1, 11)]
-  for user_id in user_ids:
-    print "Set user", user_id
-    client.create_event(
-      event="$set",
-      entity_type="user",
-      entity_id=user_id
-    )
-    count += 1
-
-  # generate 50 items, with item ids i1,i2,....,i50
-  # random assign 1 to 4 categories among c1-c6 to items
-  categories = ["c%s" % i for i in range(1, 7)]
-  item_ids = ["i%s" % i for i in range(1, 51)]
-  for item_id in item_ids:
-    print "Set item", item_id
-    client.create_event(
-      event="$set",
-      entity_type="item",
-      entity_id=item_id,
-      properties={
-        "categories" : random.sample(categories, random.randint(1, 4))
-      }
-    )
-    count += 1
-
-  # each user randomly viewed 10 items
-  for user_id in user_ids:
-    for viewed_item in random.sample(item_ids, 10):
-      print "User", user_id ,"views item", viewed_item
-      client.create_event(
-        event="view",
-        entity_type="user",
-        entity_id=user_id,
-        target_entity_type="item",
-        target_entity_id=viewed_item
-      )
-      count += 1
-
-  # each user randomly liked/disliked 10 items
-  for user_id in user_ids:
-    for viewed_item in random.sample(item_ids, 10):
-      if random.choice((False, True)) :
-        print "User", user_id ,"likes item", viewed_item
-        client.create_event(
-          event="like",
-          entity_type="user",
-          entity_id=user_id,
-          target_entity_type="item",
-          target_entity_id=viewed_item
-        )
-      else:
-        print "User", user_id ,"dislikes item", viewed_item
-        client.create_event(
-          event="dislike",
-          entity_type="user",
-          entity_id=user_id,
-          target_entity_type="item",
-          target_entity_id=viewed_item
-        )
-      count += 1
-
-  print "%s events are imported." % count
-
-if __name__ == '__main__':
-  parser = argparse.ArgumentParser(
-    description="Import sample data for similar product engine")
-  parser.add_argument('--access_key', default='invald_access_key')
-  parser.add_argument('--url', default="http://localhost:7070")
-
-  args = parser.parse_args()
-  print args
-
-  client = predictionio.EventClient(
-    access_key=args.access_key,
-    url=args.url,
-    threads=5,
-    qsize=500)
-  import_events(client)

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/multi/data/send_query.py
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/multi/data/send_query.py b/examples/scala-parallel-similarproduct/multi/data/send_query.py
deleted file mode 100644
index 8678b15..0000000
--- a/examples/scala-parallel-similarproduct/multi/data/send_query.py
+++ /dev/null
@@ -1,24 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""
-Send sample query to prediction engine
-"""
-
-import predictionio
-engine_client = predictionio.EngineClient(url="http://localhost:8000")
-print engine_client.send_query({"items": ["i1", "i3"], "num": 4})

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/multi/engine.json
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/multi/engine.json b/examples/scala-parallel-similarproduct/multi/engine.json
deleted file mode 100644
index 615140a..0000000
--- a/examples/scala-parallel-similarproduct/multi/engine.json
+++ /dev/null
@@ -1,30 +0,0 @@
-{
-  "id": "default",
-  "description": "Default settings",
-  "engineFactory": "org.template.similarproduct.SimilarProductEngine",
-  "datasource": {
-    "params" : {
-      "appId": 11
-    }
-  },
-  "algorithms": [
-    {
-      "name": "als",
-      "params": {
-        "rank": 10,
-        "numIterations" : 20,
-        "lambda": 0.01,
-        "seed": 3
-      }
-    },
-    {
-      "name": "likealgo",
-      "params": {
-        "rank": 8,
-        "numIterations" : 15,
-        "lambda": 0.01,
-        "seed": 3
-      }
-    }
-  ]
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/multi/project/assembly.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/multi/project/assembly.sbt b/examples/scala-parallel-similarproduct/multi/project/assembly.sbt
deleted file mode 100644
index 54c3252..0000000
--- a/examples/scala-parallel-similarproduct/multi/project/assembly.sbt
+++ /dev/null
@@ -1 +0,0 @@
-addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/multi/project/pio-build.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/multi/project/pio-build.sbt b/examples/scala-parallel-similarproduct/multi/project/pio-build.sbt
deleted file mode 100644
index 9aed0ee..0000000
--- a/examples/scala-parallel-similarproduct/multi/project/pio-build.sbt
+++ /dev/null
@@ -1 +0,0 @@
-addSbtPlugin("org.apache.predictionio" % "pio-build" % "0.9.0")

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/multi/src/main/scala/ALSAlgorithm.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/multi/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-similarproduct/multi/src/main/scala/ALSAlgorithm.scala
deleted file mode 100644
index e0567ef..0000000
--- a/examples/scala-parallel-similarproduct/multi/src/main/scala/ALSAlgorithm.scala
+++ /dev/null
@@ -1,282 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.similarproduct
-
-import org.apache.predictionio.controller.PAlgorithm
-import org.apache.predictionio.controller.Params
-import org.apache.predictionio.controller.IPersistentModel
-import org.apache.predictionio.controller.IPersistentModelLoader
-import org.apache.predictionio.data.storage.BiMap
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.mllib.recommendation.ALS
-import org.apache.spark.mllib.recommendation.{Rating => MLlibRating}
-
-import grizzled.slf4j.Logger
-
-import scala.collection.mutable.PriorityQueue
-
-case class ALSAlgorithmParams(
-  rank: Int,
-  numIterations: Int,
-  lambda: Double,
-  seed: Option[Long]) extends Params
-
-class ALSModel(
-  val productFeatures: RDD[(Int, Array[Double])],
-  val itemStringIntMap: BiMap[String, Int],
-  val items: Map[Int, Item]
-) extends IPersistentModel[ALSAlgorithmParams] with Serializable {
-
-  @transient lazy val itemIntStringMap = itemStringIntMap.inverse
-
-  def save(id: String, params: ALSAlgorithmParams,
-    sc: SparkContext): Boolean = {
-
-    productFeatures.saveAsObjectFile(s"/tmp/${id}/productFeatures")
-    sc.parallelize(Seq(itemStringIntMap))
-      .saveAsObjectFile(s"/tmp/${id}/itemStringIntMap")
-    sc.parallelize(Seq(items))
-      .saveAsObjectFile(s"/tmp/${id}/items")
-    true
-  }
-
-  override def toString = {
-    s" productFeatures: [${productFeatures.count()}]" +
-    s"(${productFeatures.take(2).toList}...)" +
-    s" itemStringIntMap: [${itemStringIntMap.size}]" +
-    s"(${itemStringIntMap.take(2).toString}...)]" +
-    s" items: [${items.size}]" +
-    s"(${items.take(2).toString}...)]"
-  }
-}
-
-object ALSModel
-  extends IPersistentModelLoader[ALSAlgorithmParams, ALSModel] {
-  def apply(id: String, params: ALSAlgorithmParams,
-    sc: Option[SparkContext]) = {
-    new ALSModel(
-      productFeatures = sc.get.objectFile(s"/tmp/${id}/productFeatures"),
-      itemStringIntMap = sc.get
-        .objectFile[BiMap[String, Int]](s"/tmp/${id}/itemStringIntMap").first,
-      items = sc.get
-        .objectFile[Map[Int, Item]](s"/tmp/${id}/items").first)
-  }
-}
-
-/**
-  * Use ALS to build item x feature matrix
-  */
-class ALSAlgorithm(val ap: ALSAlgorithmParams)
-  extends PAlgorithm[PreparedData, ALSModel, Query, PredictedResult] {
-
-  @transient lazy val logger = Logger[this.type]
-
-  def train(sc: SparkContext, data: PreparedData): ALSModel = {
-    require(!data.viewEvents.take(1).isEmpty,
-      s"viewEvents in PreparedData cannot be empty." +
-      " Please check if DataSource generates TrainingData" +
-      " and Preprator generates PreparedData correctly.")
-    require(!data.users.take(1).isEmpty,
-      s"users in PreparedData cannot be empty." +
-      " Please check if DataSource generates TrainingData" +
-      " and Preprator generates PreparedData correctly.")
-    require(!data.items.take(1).isEmpty,
-      s"items in PreparedData cannot be empty." +
-      " Please check if DataSource generates TrainingData" +
-      " and Preprator generates PreparedData correctly.")
-    // create User and item's String ID to integer index BiMap
-    val userStringIntMap = BiMap.stringInt(data.users.keys)
-    val itemStringIntMap = BiMap.stringInt(data.items.keys)
-
-    // collect Item as Map and convert ID to Int index
-    val items: Map[Int, Item] = data.items.map { case (id, item) =>
-      (itemStringIntMap(id), item)
-    }.collectAsMap.toMap
-
-    val mllibRatings = data.viewEvents
-      .map { r =>
-        // Convert user and item String IDs to Int index for MLlib
-        val uindex = userStringIntMap.getOrElse(r.user, -1)
-        val iindex = itemStringIntMap.getOrElse(r.item, -1)
-
-        if (uindex == -1)
-          logger.info(s"Couldn't convert nonexistent user ID ${r.user}"
-            + " to Int index.")
-
-        if (iindex == -1)
-          logger.info(s"Couldn't convert nonexistent item ID ${r.item}"
-            + " to Int index.")
-
-        ((uindex, iindex), 1)
-      }.filter { case ((u, i), v) =>
-        // keep events with valid user and item index
-        (u != -1) && (i != -1)
-      }.reduceByKey(_ + _) // aggregate all view events of same user-item pair
-      .map { case ((u, i), v) =>
-        // MLlibRating requires integer index for user and item
-        MLlibRating(u, i, v)
-      }
-      .cache()
-
-    // MLLib ALS cannot handle empty training data.
-    require(!mllibRatings.take(1).isEmpty,
-      s"mllibRatings cannot be empty." +
-      " Please check if your events contain valid user and item ID.")
-
-    // seed for MLlib ALS
-    val seed = ap.seed.getOrElse(System.nanoTime)
-
-    val m = ALS.trainImplicit(
-      ratings = mllibRatings,
-      rank = ap.rank,
-      iterations = ap.numIterations,
-      lambda = ap.lambda,
-      blocks = -1,
-      alpha = 1.0,
-      seed = seed)
-
-    new ALSModel(
-      productFeatures = m.productFeatures,
-      itemStringIntMap = itemStringIntMap,
-      items = items
-    )
-  }
-
-  def predict(model: ALSModel, query: Query): PredictedResult = {
-
-    // convert items to Int index
-    val queryList: Set[Int] = query.items.map(model.itemStringIntMap.get(_))
-      .flatten.toSet
-
-    val queryFeatures: Vector[Array[Double]] = queryList.toVector.par
-      .map { item =>
-        // productFeatures may not contain the requested item
-        val qf: Option[Array[Double]] = model.productFeatures
-          .lookup(item).headOption
-        qf
-      }.seq.flatten
-
-    val whiteList: Option[Set[Int]] = query.whiteList.map( set =>
-      set.map(model.itemStringIntMap.get(_)).flatten
-    )
-    val blackList: Option[Set[Int]] = query.blackList.map ( set =>
-      set.map(model.itemStringIntMap.get(_)).flatten
-    )
-
-    val ord = Ordering.by[(Int, Double), Double](_._2).reverse
-
-    val indexScores: Array[(Int, Double)] = if (queryFeatures.isEmpty) {
-      logger.info(s"No productFeatures vector for query items ${query.items}.")
-      Array[(Int, Double)]()
-    } else {
-      model.productFeatures
-        .mapValues { f =>
-          queryFeatures.map{ qf =>
-            cosine(qf, f)
-          }.reduce(_ + _)
-        }
-        .filter(_._2 > 0) // keep items with score > 0
-        .collect()
-    }
-
-    val filteredScore = indexScores.view.filter { case (i, v) =>
-      isCandidateItem(
-        i = i,
-        items = model.items,
-        categories = query.categories,
-        queryList = queryList,
-        whiteList = whiteList,
-        blackList = blackList
-      )
-    }
-
-    val topScores = getTopN(filteredScore, query.num)(ord).toArray
-
-    val itemScores = topScores.map { case (i, s) =>
-      new ItemScore(
-        item = model.itemIntStringMap(i),
-        score = s
-      )
-    }
-
-    new PredictedResult(itemScores)
-  }
-
-  private
-  def getTopN[T](s: Seq[T], n: Int)(implicit ord: Ordering[T]): Seq[T] = {
-
-    val q = PriorityQueue()
-
-    for (x <- s) {
-      if (q.size < n)
-        q.enqueue(x)
-      else {
-        // q is full
-        if (ord.compare(x, q.head) < 0) {
-          q.dequeue()
-          q.enqueue(x)
-        }
-      }
-    }
-
-    q.dequeueAll.toSeq.reverse
-  }
-
-  private
-  def cosine(v1: Array[Double], v2: Array[Double]): Double = {
-    val size = v1.size
-    var i = 0
-    var n1: Double = 0
-    var n2: Double = 0
-    var d: Double = 0
-    while (i < size) {
-      n1 += v1(i) * v1(i)
-      n2 += v2(i) * v2(i)
-      d += v1(i) * v2(i)
-      i += 1
-    }
-    val n1n2 = (math.sqrt(n1) * math.sqrt(n2))
-    if (n1n2 == 0) 0 else (d / n1n2)
-  }
-
-  private
-  def isCandidateItem(
-    i: Int,
-    items: Map[Int, Item],
-    categories: Option[Set[String]],
-    queryList: Set[Int],
-    whiteList: Option[Set[Int]],
-    blackList: Option[Set[Int]]
-  ): Boolean = {
-    whiteList.map(_.contains(i)).getOrElse(true) &&
-    blackList.map(!_.contains(i)).getOrElse(true) &&
-    // discard items in query as well
-    (!queryList.contains(i)) &&
-    // filter categories
-    categories.map { cat =>
-      items(i).categories.map { itemCat =>
-        // keep this item if has ovelap categories with the query
-        !(itemCat.toSet.intersect(cat).isEmpty)
-      }.getOrElse(false) // discard this item if it has no categories
-    }.getOrElse(true)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/multi/src/main/scala/DataSource.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/multi/src/main/scala/DataSource.scala b/examples/scala-parallel-similarproduct/multi/src/main/scala/DataSource.scala
deleted file mode 100644
index 38916c1..0000000
--- a/examples/scala-parallel-similarproduct/multi/src/main/scala/DataSource.scala
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.similarproduct
-
-import org.apache.predictionio.controller.PDataSource
-import org.apache.predictionio.controller.EmptyEvaluationInfo
-import org.apache.predictionio.controller.EmptyActualResult
-import org.apache.predictionio.controller.Params
-import org.apache.predictionio.data.storage.Event
-import org.apache.predictionio.data.storage.Storage
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-import grizzled.slf4j.Logger
-
-case class DataSourceParams(appId: Int) extends Params
-
-class DataSource(val dsp: DataSourceParams)
-  extends PDataSource[TrainingData,
-      EmptyEvaluationInfo, Query, EmptyActualResult] {
-
-  @transient lazy val logger = Logger[this.type]
-
-  override
-  def readTraining(sc: SparkContext): TrainingData = {
-    val eventsDb = Storage.getPEvents()
-
-    // create a RDD of (entityID, User)
-    val usersRDD: RDD[(String, User)] = eventsDb.aggregateProperties(
-      appId = dsp.appId,
-      entityType = "user"
-    )(sc).map { case (entityId, properties) =>
-      val user = try {
-        User()
-      } catch {
-        case e: Exception => {
-          logger.error(s"Failed to get properties ${properties} of" +
-            s" user ${entityId}. Exception: ${e}.")
-          throw e
-        }
-      }
-      (entityId, user)
-    }
-
-    // create a RDD of (entityID, Item)
-    val itemsRDD: RDD[(String, Item)] = eventsDb.aggregateProperties(
-      appId = dsp.appId,
-      entityType = "item"
-    )(sc).map { case (entityId, properties) =>
-      val item = try {
-        // Assume categories is optional property of item.
-        Item(categories = properties.getOpt[List[String]]("categories"))
-      } catch {
-        case e: Exception => {
-          logger.error(s"Failed to get properties ${properties} of" +
-            s" item ${entityId}. Exception: ${e}.")
-          throw e
-        }
-      }
-      (entityId, item)
-    }
-
-    // get all "user" "view" "item" events
-    val viewEventsRDD: RDD[ViewEvent] = eventsDb.find(
-      appId = dsp.appId,
-      entityType = Some("user"),
-      eventNames = Some(List("view")),
-      // targetEntityType is optional field of an event.
-      targetEntityType = Some(Some("item")))(sc)
-      // eventsDb.find() returns RDD[Event]
-      .map { event =>
-        val viewEvent = try {
-          event.event match {
-            case "view" => ViewEvent(
-              user = event.entityId,
-              item = event.targetEntityId.get,
-              t = event.eventTime.getMillis)
-            case _ => throw new Exception(s"Unexpected event ${event} is read.")
-          }
-        } catch {
-          case e: Exception => {
-            logger.error(s"Cannot convert ${event} to ViewEvent." +
-              s" Exception: ${e}.")
-            throw e
-          }
-        }
-        viewEvent
-      }
-
-    // ADDED
-    // get all "user" "like" and "dislike" "item" events
-    val likeEventsRDD: RDD[LikeEvent] = eventsDb.find(
-      appId = dsp.appId,
-      entityType = Some("user"),
-      eventNames = Some(List("like", "dislike")),
-      // targetEntityType is optional field of an event.
-      targetEntityType = Some(Some("item")))(sc)
-      // eventsDb.find() returns RDD[Event]
-      .map { event =>
-        val likeEvent = try {
-          event.event match {
-            case "like" | "dislike" => LikeEvent(
-              user = event.entityId,
-              item = event.targetEntityId.get,
-              t = event.eventTime.getMillis,
-              like = (event.event == "like"))
-            case _ => throw new Exception(s"Unexpected event ${event} is read.")
-          }
-        } catch {
-          case e: Exception => {
-            logger.error(s"Cannot convert ${event} to LikeEvent." +
-              s" Exception: ${e}.")
-            throw e
-          }
-        }
-        likeEvent
-      }
-
-    new TrainingData(
-      users = usersRDD,
-      items = itemsRDD,
-      viewEvents = viewEventsRDD,
-      likeEvents = likeEventsRDD // ADDED
-    )
-  }
-}
-
-case class User()
-
-case class Item(categories: Option[List[String]])
-
-case class ViewEvent(user: String, item: String, t: Long)
-
-case class LikeEvent( // ADDED
-  user: String,
-  item: String,
-  t: Long,
-  like: Boolean // true: like. false: dislike
-)
-
-class TrainingData(
-  val users: RDD[(String, User)],
-  val items: RDD[(String, Item)],
-  val viewEvents: RDD[ViewEvent],
-  val likeEvents: RDD[LikeEvent] // ADDED
-) extends Serializable {
-  override def toString = {
-    s"users: [${users.count()} (${users.take(2).toList}...)]" +
-    s"items: [${items.count()} (${items.take(2).toList}...)]" +
-    s"viewEvents: [${viewEvents.count()}] (${viewEvents.take(2).toList}...)" +
-    // ADDED
-    s"likeEvents: [${likeEvents.count()}] (${likeEvents.take(2).toList}...)"
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/multi/src/main/scala/Engine.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/multi/src/main/scala/Engine.scala b/examples/scala-parallel-similarproduct/multi/src/main/scala/Engine.scala
deleted file mode 100644
index c63ecc5..0000000
--- a/examples/scala-parallel-similarproduct/multi/src/main/scala/Engine.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.similarproduct
-
-import org.apache.predictionio.controller.IEngineFactory
-import org.apache.predictionio.controller.Engine
-
-case class Query(
-  items: List[String],
-  num: Int,
-  categories: Option[Set[String]],
-  whiteList: Option[Set[String]],
-  blackList: Option[Set[String]]
-)
-
-case class PredictedResult(
-  itemScores: Array[ItemScore]
-) {
-  override def toString = itemScores.mkString(",")
-}
-
-case class ItemScore(
-  item: String,
-  score: Double
-)
-
-object SimilarProductEngine extends IEngineFactory {
-  def apply() = {
-    new Engine(
-      classOf[DataSource],
-      classOf[Preparator],
-      Map("als" -> classOf[ALSAlgorithm],
-        "likealgo" -> classOf[LikeAlgorithm]), // ADDED
-      classOf[Serving])
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/multi/src/main/scala/LikeAlgorithm.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/multi/src/main/scala/LikeAlgorithm.scala b/examples/scala-parallel-similarproduct/multi/src/main/scala/LikeAlgorithm.scala
deleted file mode 100644
index 5b1eccb..0000000
--- a/examples/scala-parallel-similarproduct/multi/src/main/scala/LikeAlgorithm.scala
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.similarproduct
-
-import org.apache.predictionio.data.storage.BiMap
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.mllib.recommendation.ALS
-import org.apache.spark.mllib.recommendation.{Rating => MLlibRating}
-
-import grizzled.slf4j.Logger
-
-// ADDED
-// Extend original ALSAlgorithm and override train() function to handle
-// like and dislike events
-class LikeAlgorithm(ap: ALSAlgorithmParams) extends ALSAlgorithm(ap) {
-
-  @transient lazy override val logger = Logger[this.type]
-
-  override
-  def train(sc: SparkContext, data: PreparedData): ALSModel = {
-    require(!data.likeEvents.take(1).isEmpty,
-      s"likeEvents in PreparedData cannot be empty." +
-      " Please check if DataSource generates TrainingData" +
-      " and Preprator generates PreparedData correctly.")
-    require(!data.users.take(1).isEmpty,
-      s"users in PreparedData cannot be empty." +
-      " Please check if DataSource generates TrainingData" +
-      " and Preprator generates PreparedData correctly.")
-    require(!data.items.take(1).isEmpty,
-      s"items in PreparedData cannot be empty." +
-      " Please check if DataSource generates TrainingData" +
-      " and Preprator generates PreparedData correctly.")
-    // create User and item's String ID to integer index BiMap
-    val userStringIntMap = BiMap.stringInt(data.users.keys)
-    val itemStringIntMap = BiMap.stringInt(data.items.keys)
-
-    // collect Item as Map and convert ID to Int index
-    val items: Map[Int, Item] = data.items.map { case (id, item) =>
-      (itemStringIntMap(id), item)
-    }.collectAsMap.toMap
-
-    val mllibRatings = data.likeEvents
-      .map { r =>
-        // Convert user and item String IDs to Int index for MLlib
-        val uindex = userStringIntMap.getOrElse(r.user, -1)
-        val iindex = itemStringIntMap.getOrElse(r.item, -1)
-
-        if (uindex == -1)
-          logger.info(s"Couldn't convert nonexistent user ID ${r.user}"
-            + " to Int index.")
-
-        if (iindex == -1)
-          logger.info(s"Couldn't convert nonexistent item ID ${r.item}"
-            + " to Int index.")
-
-        // key is (uindex, iindex) tuple, value is (like, t) tuple
-        ((uindex, iindex), (r.like, r.t))
-      }.filter { case ((u, i), v) =>
-        //val  = d
-        // keep events with valid user and item index
-        (u != -1) && (i != -1)
-      }.reduceByKey { case (v1, v2) => // MODIFIED
-        // An user may like an item and change to dislike it later,
-        // or vice versa. Use the latest value for this case.
-        val (like1, t1) = v1
-        val (like2, t2) = v2
-        // keep the latest value
-        if (t1 > t2) v1 else v2
-      }.map { case ((u, i), (like, t)) => // MODIFIED
-        // With ALS.trainImplicit(), we can use negative value to indicate
-        // nagative siginal (ie. dislike)
-        val r = if (like) 1 else -1
-        // MLlibRating requires integer index for user and item
-        MLlibRating(u, i, r)
-      }
-      .cache()
-
-    // MLLib ALS cannot handle empty training data.
-    require(!mllibRatings.take(1).isEmpty,
-      s"mllibRatings cannot be empty." +
-      " Please check if your events contain valid user and item ID.")
-    // seed for MLlib ALS
-    val seed = ap.seed.getOrElse(System.nanoTime)
-
-    val m = ALS.trainImplicit(
-      ratings = mllibRatings,
-      rank = ap.rank,
-      iterations = ap.numIterations,
-      lambda = ap.lambda,
-      blocks = -1,
-      alpha = 1.0,
-      seed = seed)
-
-    new ALSModel(
-      productFeatures = m.productFeatures,
-      itemStringIntMap = itemStringIntMap,
-      items = items
-    )
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/multi/src/main/scala/Preparator.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/multi/src/main/scala/Preparator.scala b/examples/scala-parallel-similarproduct/multi/src/main/scala/Preparator.scala
deleted file mode 100644
index b45cf49..0000000
--- a/examples/scala-parallel-similarproduct/multi/src/main/scala/Preparator.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.similarproduct
-
-import org.apache.predictionio.controller.PPreparator
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-class Preparator
-  extends PPreparator[TrainingData, PreparedData] {
-
-  def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = {
-    new PreparedData(
-      users = trainingData.users,
-      items = trainingData.items,
-      viewEvents = trainingData.viewEvents,
-      likeEvents = trainingData.likeEvents) // ADDED
-  }
-}
-
-class PreparedData(
-  val users: RDD[(String, User)],
-  val items: RDD[(String, Item)],
-  val viewEvents: RDD[ViewEvent],
-  val likeEvents: RDD[LikeEvent] // ADDED
-) extends Serializable

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/multi/src/main/scala/Serving.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/multi/src/main/scala/Serving.scala b/examples/scala-parallel-similarproduct/multi/src/main/scala/Serving.scala
deleted file mode 100644
index 9159566..0000000
--- a/examples/scala-parallel-similarproduct/multi/src/main/scala/Serving.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.similarproduct
-
-import org.apache.predictionio.controller.LServing
-
-import breeze.stats.mean
-import breeze.stats.meanAndVariance
-import breeze.stats.MeanAndVariance
-
-class Serving
-  extends LServing[Query, PredictedResult] {
-
-  override def serve(query: Query,
-    predictedResults: Seq[PredictedResult]): PredictedResult = {
-
-    // MODFIED
-    val standard: Seq[Array[ItemScore]] = if (query.num == 1) {
-      // if query 1 item, don't standardize
-      predictedResults.map(_.itemScores)
-    } else {
-      // Standardize the score before combine
-      val mvList: Seq[MeanAndVariance] = predictedResults.map { pr =>
-        meanAndVariance(pr.itemScores.map(_.score))
-      }
-
-      predictedResults.zipWithIndex
-        .map { case (pr, i) =>
-          pr.itemScores.map { is =>
-            // standardize score (z-score)
-            // if standard deviation is 0 (when all items have the same score,
-            // meaning all items are ranked equally), return 0.
-            val score = if (mvList(i).stdDev == 0) {
-              0
-            } else {
-              (is.score - mvList(i).mean) / mvList(i).stdDev
-            }
-
-            ItemScore(is.item, score)
-          }
-        }
-    }
-
-    // sum the standardized score if same item
-    val combined = standard.flatten // Array of ItemScore
-      .groupBy(_.item) // groupBy item id
-      .mapValues(itemScores => itemScores.map(_.score).reduce(_ + _))
-      .toArray // array of (item id, score)
-      .sortBy(_._2)(Ordering.Double.reverse)
-      .take(query.num)
-      .map { case (k,v) => ItemScore(k, v) }
-
-    new PredictedResult(combined)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/no-set-user/.gitignore
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/no-set-user/.gitignore b/examples/scala-parallel-similarproduct/no-set-user/.gitignore
deleted file mode 100644
index 57841c6..0000000
--- a/examples/scala-parallel-similarproduct/no-set-user/.gitignore
+++ /dev/null
@@ -1,4 +0,0 @@
-manifest.json
-target/
-pio.log
-/pio.sbt

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/no-set-user/README.md
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/no-set-user/README.md b/examples/scala-parallel-similarproduct/no-set-user/README.md
deleted file mode 100644
index dbce1af..0000000
--- a/examples/scala-parallel-similarproduct/no-set-user/README.md
+++ /dev/null
@@ -1,141 +0,0 @@
-<!--
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
--->
-
-# Similar Product Template without sending '$set' events for users
-
-In some cases, if you don't need to keep track the user ID being created/deleted or user properties changes with events, then you can simplify the template as described in this example to get rid of sending '$set' events for users. The user Id can be extracted from the user-to-item events (eg. view events). You can find the complete source code in src/ directory.
-
-This example engine is based on Similar Product Template version v0.1.3.
-
-## Documentation
-
-Please refer to http://predictionio.incubator.apache.org/templates/similarproduct/quickstart/
-
-## Development Notes
-
-There you can find several steps that we have done to modify original Similar Product template
-
-### Changes to import_eventserver.py
-
-For importing sample data, you can remove the following code that generates '$set' events for entities with type 'user'
-
-```python
-  for user_id in user_ids:
-    print "Set user", user_id
-    client.create_event(
-      event="$set",
-      entity_type="user",
-      entity_id=user_id
-    )
-    count += 1
-```
-
-### Changes to DataSource.scala
-
-Remove the following code
-
-```scala
-    // create a RDD of (entityID, User)
-    val usersRDD: RDD[(String, User)] = eventsDb.aggregateProperties(
-      appId = dsp.appId,
-      entityType = "user"
-    )(sc).map { case (entityId, properties) =>
-      val user = try {
-        User()
-      } catch {
-        case e: Exception => {
-          logger.error(s"Failed to get properties ${properties} of" +
-            s" user ${entityId}. Exception: ${e}.")
-          throw e
-        }
-      }
-      (entityId, user)
-    }.cache()
-```
-
-Modify class 'TrainingData' that should looks like the following code:
-
-```scala
-class TrainingData(
-  val items: RDD[(String, Item)],
-  val viewEvents: RDD[ViewEvent]
-) extends Serializable {
-  override def toString = {
-    s"items: [${items.count()} (${items.take(2).toList}...)]" +
-    s"viewEvents: [${viewEvents.count()}] (${viewEvents.take(2).toList}...)"
-  }
-}
-```
-
-Then fix compilation issue in invocation 'TrainingData' constructor that should looks like following:
-
-```
-    new TrainingData(
-      items = itemsRDD,
-      viewEvents = viewEventsRDD
-    )
-```
-
-### Changes to Preparator.scala
-
-We removed RDD with users from class 'TrainingData' at the previous step.
-So, we should apply the same changes for class 'PreparedData'.
-
-The file Preparator.scala should looks like the following code:
-
-```
-class Preparator
-  extends PPreparator[TrainingData, PreparedData] {
-  def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = {
-    new PreparedData(
-      items = trainingData.items,
-      viewEvents = trainingData.viewEvents)
-  }
-}
-class PreparedData(
-  val items: RDD[(String, Item)],
-  val viewEvents: RDD[ViewEvent]
-) extends Serializable
-```
-
-### Changes to ALSAlgorithm.scala
-
-Remove the following code:
-
-```scala
-    require(!data.users.take(1).isEmpty,
-      s"users in PreparedData cannot be empty." +
-      " Please check if DataSource generates TrainingData" +
-      " and Preprator generates PreparedData correctly.")
-```
-
-and modify the line where 'userStringIntMap' value is defined to extract the user ID from the viewEvents and create user String ID to Int Index BiMap. It should look like the following code:
-
-```scala
-val userStringIntMap = BiMap.stringInt(data.viewEvents.map(_.user))
-```
-
-## Test the Changes
-
-```
-$ pio build
-$ pio train
-$ pio deploy
-```
-
-After that you can try to make the same queries that were listed in the quickstart.
-The result will be the same.

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/no-set-user/build.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/no-set-user/build.sbt b/examples/scala-parallel-similarproduct/no-set-user/build.sbt
deleted file mode 100644
index ef66b2f..0000000
--- a/examples/scala-parallel-similarproduct/no-set-user/build.sbt
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import AssemblyKeys._
-
-assemblySettings
-
-name := "template-scala-parallel-similarproduct"
-
-organization := "org.apache.predictionio"
-
-libraryDependencies ++= Seq(
-  "org.apache.predictionio"    %% "core"          % pioVersion.value % "provided",
-  "org.apache.spark" %% "spark-core"    % "1.2.0" % "provided",
-  "org.apache.spark" %% "spark-mllib"   % "1.2.0" % "provided")

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/no-set-user/data/import_eventserver.py
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/no-set-user/data/import_eventserver.py b/examples/scala-parallel-similarproduct/no-set-user/data/import_eventserver.py
deleted file mode 100644
index 1ac4b67..0000000
--- a/examples/scala-parallel-similarproduct/no-set-user/data/import_eventserver.py
+++ /dev/null
@@ -1,82 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""
-Import sample data for similar product engine
-"""
-
-import predictionio
-import argparse
-import random
-
-SEED = 3
-
-def import_events(client):
-  random.seed(SEED)
-  count = 0
-  print client.get_status()
-  print "Importing data..."
-
-  # generate 10 users, with user ids u1,u2,....,u10
-  user_ids = ["u%s" % i for i in range(1, 11)]
-
-  # generate 50 items, with item ids i1,i2,....,i50
-  # random assign 1 to 4 categories among c1-c6 to items
-  categories = ["c%s" % i for i in range(1, 7)]
-  item_ids = ["i%s" % i for i in range(1, 51)]
-  for item_id in item_ids:
-    print "Set item", item_id
-    client.create_event(
-      event="$set",
-      entity_type="item",
-      entity_id=item_id,
-      properties={
-        "categories" : random.sample(categories, random.randint(1, 4))
-      }
-    )
-    count += 1
-
-  # each user randomly viewed 10 items
-  for user_id in user_ids:
-    for viewed_item in random.sample(item_ids, 10):
-      print "User", user_id ,"views item", viewed_item
-      client.create_event(
-        event="view",
-        entity_type="user",
-        entity_id=user_id,
-        target_entity_type="item",
-        target_entity_id=viewed_item
-      )
-      count += 1
-
-  print "%s events are imported." % count
-
-if __name__ == '__main__':
-  parser = argparse.ArgumentParser(
-    description="Import sample data for similar product engine")
-  parser.add_argument('--access_key', default='invald_access_key')
-  parser.add_argument('--url', default="http://localhost:7070")
-
-  args = parser.parse_args()
-  print args
-
-  client = predictionio.EventClient(
-    access_key=args.access_key,
-    url=args.url,
-    threads=5,
-    qsize=500)
-  import_events(client)

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/no-set-user/data/send_query.py
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/no-set-user/data/send_query.py b/examples/scala-parallel-similarproduct/no-set-user/data/send_query.py
deleted file mode 100644
index 8678b15..0000000
--- a/examples/scala-parallel-similarproduct/no-set-user/data/send_query.py
+++ /dev/null
@@ -1,24 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""
-Send sample query to prediction engine
-"""
-
-import predictionio
-engine_client = predictionio.EngineClient(url="http://localhost:8000")
-print engine_client.send_query({"items": ["i1", "i3"], "num": 4})

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/no-set-user/engine.json
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/no-set-user/engine.json b/examples/scala-parallel-similarproduct/no-set-user/engine.json
deleted file mode 100644
index 6f462b8..0000000
--- a/examples/scala-parallel-similarproduct/no-set-user/engine.json
+++ /dev/null
@@ -1,21 +0,0 @@
-{
-  "id": "default",
-  "description": "Default settings",
-  "engineFactory": "org.template.similarproduct.SimilarProductEngine",
-  "datasource": {
-    "params" : {
-      "appId": 3
-    }
-  },
-  "algorithms": [
-    {
-      "name": "als",
-      "params": {
-        "rank": 10,
-        "numIterations" : 20,
-        "lambda": 0.01,
-        "seed": 3
-      }
-    }
-  ]
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/no-set-user/project/assembly.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/no-set-user/project/assembly.sbt b/examples/scala-parallel-similarproduct/no-set-user/project/assembly.sbt
deleted file mode 100644
index 54c3252..0000000
--- a/examples/scala-parallel-similarproduct/no-set-user/project/assembly.sbt
+++ /dev/null
@@ -1 +0,0 @@
-addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/no-set-user/project/pio-build.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/no-set-user/project/pio-build.sbt b/examples/scala-parallel-similarproduct/no-set-user/project/pio-build.sbt
deleted file mode 100644
index 9aed0ee..0000000
--- a/examples/scala-parallel-similarproduct/no-set-user/project/pio-build.sbt
+++ /dev/null
@@ -1 +0,0 @@
-addSbtPlugin("org.apache.predictionio" % "pio-build" % "0.9.0")

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/no-set-user/src/main/scala/ALSAlgorithm.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/no-set-user/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-similarproduct/no-set-user/src/main/scala/ALSAlgorithm.scala
deleted file mode 100644
index 5053bc1..0000000
--- a/examples/scala-parallel-similarproduct/no-set-user/src/main/scala/ALSAlgorithm.scala
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.similarproduct
-
-import org.apache.predictionio.controller.P2LAlgorithm
-import org.apache.predictionio.controller.Params
-import org.apache.predictionio.data.storage.BiMap
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.mllib.recommendation.ALS
-import org.apache.spark.mllib.recommendation.{Rating => MLlibRating}
-
-import grizzled.slf4j.Logger
-
-import scala.collection.mutable.PriorityQueue
-
-case class ALSAlgorithmParams(
-  rank: Int,
-  numIterations: Int,
-  lambda: Double,
-  seed: Option[Long]) extends Params
-
-class ALSModel(
-  val productFeatures: Map[Int, Array[Double]],
-  val itemStringIntMap: BiMap[String, Int],
-  val items: Map[Int, Item]
-) extends Serializable {
-
-  @transient lazy val itemIntStringMap = itemStringIntMap.inverse
-
-  override def toString = {
-    s" productFeatures: [${productFeatures.size}]" +
-    s"(${productFeatures.take(2).toList}...)" +
-    s" itemStringIntMap: [${itemStringIntMap.size}]" +
-    s"(${itemStringIntMap.take(2).toString}...)]" +
-    s" items: [${items.size}]" +
-    s"(${items.take(2).toString}...)]"
-  }
-}
-
-/**
-  * Use ALS to build item x feature matrix
-  */
-class ALSAlgorithm(val ap: ALSAlgorithmParams)
-  extends P2LAlgorithm[PreparedData, ALSModel, Query, PredictedResult] {
-
-  @transient lazy val logger = Logger[this.type]
-
-  def train(sc: SparkContext, data: PreparedData): ALSModel = {
-    require(!data.viewEvents.take(1).isEmpty,
-      s"viewEvents in PreparedData cannot be empty." +
-      " Please check if DataSource generates TrainingData" +
-      " and Preprator generates PreparedData correctly.")
-    require(!data.items.take(1).isEmpty,
-      s"items in PreparedData cannot be empty." +
-      " Please check if DataSource generates TrainingData" +
-      " and Preprator generates PreparedData correctly.")
-    // create User and item's String ID to integer index BiMap
-    val userStringIntMap = BiMap.stringInt(data.viewEvents.map(_.user))
-    val itemStringIntMap = BiMap.stringInt(data.items.keys)
-
-    // collect Item as Map and convert ID to Int index
-    val items: Map[Int, Item] = data.items.map { case (id, item) =>
-      (itemStringIntMap(id), item)
-    }.collectAsMap.toMap
-
-    val mllibRatings = data.viewEvents
-      .map { r =>
-        // Convert user and item String IDs to Int index for MLlib
-        val uindex = userStringIntMap.getOrElse(r.user, -1)
-        val iindex = itemStringIntMap.getOrElse(r.item, -1)
-
-        if (uindex == -1)
-          logger.info(s"Couldn't convert nonexistent user ID ${r.user}"
-            + " to Int index.")
-
-        if (iindex == -1)
-          logger.info(s"Couldn't convert nonexistent item ID ${r.item}"
-            + " to Int index.")
-
-        ((uindex, iindex), 1)
-      }.filter { case ((u, i), v) =>
-        // keep events with valid user and item index
-        (u != -1) && (i != -1)
-      }.reduceByKey(_ + _) // aggregate all view events of same user-item pair
-      .map { case ((u, i), v) =>
-        // MLlibRating requires integer index for user and item
-        MLlibRating(u, i, v)
-      }
-      .cache()
-
-    // MLLib ALS cannot handle empty training data.
-    require(!mllibRatings.take(1).isEmpty,
-      s"mllibRatings cannot be empty." +
-      " Please check if your events contain valid user and item ID.")
-
-    // seed for MLlib ALS
-    val seed = ap.seed.getOrElse(System.nanoTime)
-
-    val m = ALS.trainImplicit(
-      ratings = mllibRatings,
-      rank = ap.rank,
-      iterations = ap.numIterations,
-      lambda = ap.lambda,
-      blocks = -1,
-      alpha = 1.0,
-      seed = seed)
-
-    new ALSModel(
-      productFeatures = m.productFeatures.collectAsMap.toMap,
-      itemStringIntMap = itemStringIntMap,
-      items = items
-    )
-  }
-
-  def predict(model: ALSModel, query: Query): PredictedResult = {
-
-    val productFeatures = model.productFeatures
-
-    // convert items to Int index
-    val queryList: Set[Int] = query.items.map(model.itemStringIntMap.get(_))
-      .flatten.toSet
-
-    val queryFeatures: Vector[Array[Double]] = queryList.toVector
-      // productFeatures may not contain the requested item
-      .map { item => productFeatures.get(item) }
-      .flatten
-
-    val whiteList: Option[Set[Int]] = query.whiteList.map( set =>
-      set.map(model.itemStringIntMap.get(_)).flatten
-    )
-    val blackList: Option[Set[Int]] = query.blackList.map ( set =>
-      set.map(model.itemStringIntMap.get(_)).flatten
-    )
-
-    val ord = Ordering.by[(Int, Double), Double](_._2).reverse
-
-    val indexScores: Array[(Int, Double)] = if (queryFeatures.isEmpty) {
-      logger.info(s"No productFeatures vector for query items ${query.items}.")
-      Array[(Int, Double)]()
-    } else {
-      productFeatures.par // convert to parallel collection
-        .mapValues { f =>
-          queryFeatures.map{ qf =>
-            cosine(qf, f)
-          }.reduce(_ + _)
-        }
-        .filter(_._2 > 0) // keep items with score > 0
-        .seq // convert back to sequential collection
-        .toArray
-    }
-
-    val filteredScore = indexScores.view.filter { case (i, v) =>
-      isCandidateItem(
-        i = i,
-        items = model.items,
-        categories = query.categories,
-        queryList = queryList,
-        whiteList = whiteList,
-        blackList = blackList
-      )
-    }
-
-    val topScores = getTopN(filteredScore, query.num)(ord).toArray
-
-    val itemScores = topScores.map { case (i, s) =>
-      new ItemScore(
-        item = model.itemIntStringMap(i),
-        score = s
-      )
-    }
-
-    new PredictedResult(itemScores)
-  }
-
-  private
-  def getTopN[T](s: Seq[T], n: Int)(implicit ord: Ordering[T]): Seq[T] = {
-
-    val q = PriorityQueue()
-
-    for (x <- s) {
-      if (q.size < n)
-        q.enqueue(x)
-      else {
-        // q is full
-        if (ord.compare(x, q.head) < 0) {
-          q.dequeue()
-          q.enqueue(x)
-        }
-      }
-    }
-
-    q.dequeueAll.toSeq.reverse
-  }
-
-  private
-  def cosine(v1: Array[Double], v2: Array[Double]): Double = {
-    val size = v1.size
-    var i = 0
-    var n1: Double = 0
-    var n2: Double = 0
-    var d: Double = 0
-    while (i < size) {
-      n1 += v1(i) * v1(i)
-      n2 += v2(i) * v2(i)
-      d += v1(i) * v2(i)
-      i += 1
-    }
-    val n1n2 = (math.sqrt(n1) * math.sqrt(n2))
-    if (n1n2 == 0) 0 else (d / n1n2)
-  }
-
-  private
-  def isCandidateItem(
-    i: Int,
-    items: Map[Int, Item],
-    categories: Option[Set[String]],
-    queryList: Set[Int],
-    whiteList: Option[Set[Int]],
-    blackList: Option[Set[Int]]
-  ): Boolean = {
-    whiteList.map(_.contains(i)).getOrElse(true) &&
-    blackList.map(!_.contains(i)).getOrElse(true) &&
-    // discard items in query as well
-    (!queryList.contains(i)) &&
-    // filter categories
-    categories.map { cat =>
-      items(i).categories.map { itemCat =>
-        // keep this item if has ovelap categories with the query
-        !(itemCat.toSet.intersect(cat).isEmpty)
-      }.getOrElse(false) // discard this item if it has no categories
-    }.getOrElse(true)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/no-set-user/src/main/scala/DataSource.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/no-set-user/src/main/scala/DataSource.scala b/examples/scala-parallel-similarproduct/no-set-user/src/main/scala/DataSource.scala
deleted file mode 100644
index 927b20d..0000000
--- a/examples/scala-parallel-similarproduct/no-set-user/src/main/scala/DataSource.scala
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.similarproduct
-
-import org.apache.predictionio.controller.PDataSource
-import org.apache.predictionio.controller.EmptyEvaluationInfo
-import org.apache.predictionio.controller.EmptyActualResult
-import org.apache.predictionio.controller.Params
-import org.apache.predictionio.data.storage.Event
-import org.apache.predictionio.data.storage.Storage
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-import grizzled.slf4j.Logger
-
-case class DataSourceParams(appId: Int) extends Params
-
-class DataSource(val dsp: DataSourceParams)
-  extends PDataSource[TrainingData,
-      EmptyEvaluationInfo, Query, EmptyActualResult] {
-
-  @transient lazy val logger = Logger[this.type]
-
-  override
-  def readTraining(sc: SparkContext): TrainingData = {
-    val eventsDb = Storage.getPEvents()
-
-    // create a RDD of (entityID, Item)
-    val itemsRDD: RDD[(String, Item)] = eventsDb.aggregateProperties(
-      appId = dsp.appId,
-      entityType = "item"
-    )(sc).map { case (entityId, properties) =>
-      val item = try {
-        // Assume categories is optional property of item.
-        Item(categories = properties.getOpt[List[String]]("categories"))
-      } catch {
-        case e: Exception => {
-          logger.error(s"Failed to get properties ${properties} of" +
-            s" item ${entityId}. Exception: ${e}.")
-          throw e
-        }
-      }
-      (entityId, item)
-    }.cache()
-
-    // get all "user" "view" "item" events
-    val viewEventsRDD: RDD[ViewEvent] = eventsDb.find(
-      appId = dsp.appId,
-      entityType = Some("user"),
-      eventNames = Some(List("view")),
-      // targetEntityType is optional field of an event.
-      targetEntityType = Some(Some("item")))(sc)
-      // eventsDb.find() returns RDD[Event]
-      .map { event =>
-        val viewEvent = try {
-          event.event match {
-            case "view" => ViewEvent(
-              user = event.entityId,
-              item = event.targetEntityId.get,
-              t = event.eventTime.getMillis)
-            case _ => throw new Exception(s"Unexpected event ${event} is read.")
-          }
-        } catch {
-          case e: Exception => {
-            logger.error(s"Cannot convert ${event} to ViewEvent." +
-              s" Exception: ${e}.")
-            throw e
-          }
-        }
-        viewEvent
-      }.cache()
-
-    new TrainingData(
-      items = itemsRDD,
-      viewEvents = viewEventsRDD
-    )
-  }
-}
-
-case class Item(categories: Option[List[String]])
-
-case class ViewEvent(user: String, item: String, t: Long)
-
-class TrainingData(
-  val items: RDD[(String, Item)],
-  val viewEvents: RDD[ViewEvent]
-) extends Serializable {
-  override def toString = {
-    s"items: [${items.count()} (${items.take(2).toList}...)]" +
-    s"viewEvents: [${viewEvents.count()}] (${viewEvents.take(2).toList}...)"
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/no-set-user/src/main/scala/Engine.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/no-set-user/src/main/scala/Engine.scala b/examples/scala-parallel-similarproduct/no-set-user/src/main/scala/Engine.scala
deleted file mode 100644
index 52b19fe..0000000
--- a/examples/scala-parallel-similarproduct/no-set-user/src/main/scala/Engine.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.similarproduct
-
-import org.apache.predictionio.controller.IEngineFactory
-import org.apache.predictionio.controller.Engine
-
-case class Query(
-  items: List[String],
-  num: Int,
-  categories: Option[Set[String]],
-  whiteList: Option[Set[String]],
-  blackList: Option[Set[String]]
-)
-
-case class PredictedResult(
-  itemScores: Array[ItemScore]
-)
-
-case class ItemScore(
-  item: String,
-  score: Double
-)
-
-object SimilarProductEngine extends IEngineFactory {
-  def apply() = {
-    new Engine(
-      classOf[DataSource],
-      classOf[Preparator],
-      Map("als" -> classOf[ALSAlgorithm]),
-      classOf[Serving])
-  }
-}