You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by ch...@apache.org on 2017/09/28 15:54:50 UTC

[09/57] [abbrv] incubator-predictionio git commit: [PIO-97] Fixes examples of the official templates for v0.11.0-incubating.

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/src/main/scala/ALSAlgorithm.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/src/main/scala/ALSAlgorithm.scala
deleted file mode 100644
index beade42..0000000
--- a/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/src/main/scala/ALSAlgorithm.scala
+++ /dev/null
@@ -1,453 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.ecommercerecommendation
-
-import org.apache.predictionio.controller.P2LAlgorithm
-import org.apache.predictionio.controller.Params
-import org.apache.predictionio.data.storage.BiMap
-import org.apache.predictionio.data.storage.Event
-import org.apache.predictionio.data.storage.Storage
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.mllib.recommendation.ALS
-import org.apache.spark.mllib.recommendation.{Rating => MLlibRating}
-
-import grizzled.slf4j.Logger
-
-import scala.collection.mutable.PriorityQueue
-import scala.concurrent.duration.Duration
-import scala.concurrent.ExecutionContext.Implicits.global
-
-case class ALSAlgorithmParams(
-  appId: Int,
-  unseenOnly: Boolean,
-  seenEvents: List[String],
-  rank: Int,
-  numIterations: Int,
-  lambda: Double,
-  seed: Option[Long]
-) extends Params
-
-class ALSModel(
-  val rank: Int,
-  val userFeatures: Map[Int, Array[Double]],
-  val productFeatures: Map[Int, (Item, Option[Array[Double]])],
-  val userStringIntMap: BiMap[String, Int],
-  val itemStringIntMap: BiMap[String, Int]
-) extends Serializable {
-
-  @transient lazy val itemIntStringMap = itemStringIntMap.inverse
-
-  override def toString = {
-    s" rank: ${rank}" +
-    s" userFeatures: [${userFeatures.size}]" +
-    s"(${userFeatures.take(2).toList}...)" +
-    s" productFeatures: [${productFeatures.size}]" +
-    s"(${productFeatures.take(2).toList}...)" +
-    s" userStringIntMap: [${userStringIntMap.size}]" +
-    s"(${userStringIntMap.take(2).toString}...)]" +
-    s" itemStringIntMap: [${itemStringIntMap.size}]" +
-    s"(${itemStringIntMap.take(2).toString}...)]"
-  }
-}
-
-/**
-  * Use ALS to build item x feature matrix
-  */
-class ALSAlgorithm(val ap: ALSAlgorithmParams)
-  extends P2LAlgorithm[PreparedData, ALSModel, Query, PredictedResult] {
-
-  @transient lazy val logger = Logger[this.type]
-  // NOTE: use getLEvents() for local access
-  @transient lazy val lEventsDb = Storage.getLEvents()
-
-  def train(sc: SparkContext, data: PreparedData): ALSModel = {
-    require(!data.rateEvents.take(1).isEmpty, // MODIFIED
-      s"rateEvents in PreparedData cannot be empty." +
-      " Please check if DataSource generates TrainingData" +
-      " and Preprator generates PreparedData correctly.")
-    require(!data.users.take(1).isEmpty,
-      s"users in PreparedData cannot be empty." +
-      " Please check if DataSource generates TrainingData" +
-      " and Preprator generates PreparedData correctly.")
-    require(!data.items.take(1).isEmpty,
-      s"items in PreparedData cannot be empty." +
-      " Please check if DataSource generates TrainingData" +
-      " and Preprator generates PreparedData correctly.")
-    // create User and item's String ID to integer index BiMap
-    val userStringIntMap = BiMap.stringInt(data.users.keys)
-    val itemStringIntMap = BiMap.stringInt(data.items.keys)
-
-    val mllibRatings = data.rateEvents // MODIFIED
-      .map { r =>
-        // Convert user and item String IDs to Int index for MLlib
-        val uindex = userStringIntMap.getOrElse(r.user, -1)
-        val iindex = itemStringIntMap.getOrElse(r.item, -1)
-
-        if (uindex == -1)
-          logger.info(s"Couldn't convert nonexistent user ID ${r.user}"
-            + " to Int index.")
-
-        if (iindex == -1)
-          logger.info(s"Couldn't convert nonexistent item ID ${r.item}"
-            + " to Int index.")
-
-        ((uindex, iindex), (r.rating, r.t)) // MODIFIED
-      }.filter { case ((u, i), v) =>
-        // keep events with valid user and item index
-        (u != -1) && (i != -1)
-      }.reduceByKey { case (v1, v2) => // MODIFIED
-        // if a user may rate same item with different value at different times,
-        // use the latest value for this case.
-        // Can remove this reduceByKey() if no need to support this case.
-        val (rating1, t1) = v1
-        val (rating2, t2) = v2
-        // keep the latest value
-        if (t1 > t2) v1 else v2
-      }
-      .map { case ((u, i), (rating, t)) => // MODIFIED
-        // MLlibRating requires integer index for user and item
-        MLlibRating(u, i, rating) // MODIFIED
-      }.cache()
-
-    // MLLib ALS cannot handle empty training data.
-    require(!mllibRatings.take(1).isEmpty,
-      s"mllibRatings cannot be empty." +
-      " Please check if your events contain valid user and item ID.")
-
-    // seed for MLlib ALS
-    val seed = ap.seed.getOrElse(System.nanoTime)
-
-    val m = ALS.train( // MODIFIED
-      ratings = mllibRatings,
-      rank = ap.rank,
-      iterations = ap.numIterations,
-      lambda = ap.lambda,
-      blocks = -1,
-      seed = seed)
-
-    val userFeatures = m.userFeatures.collectAsMap.toMap
-
-    // convert ID to Int index
-    val items = data.items.map { case (id, item) =>
-      (itemStringIntMap(id), item)
-    }
-
-    // join item with the trained productFeatures
-    val productFeatures = items.leftOuterJoin(m.productFeatures)
-      .collectAsMap.toMap
-
-    new ALSModel(
-      rank = m.rank,
-      userFeatures = userFeatures,
-      productFeatures = productFeatures,
-      userStringIntMap = userStringIntMap,
-      itemStringIntMap = itemStringIntMap
-    )
-  }
-
-  def predict(model: ALSModel, query: Query): PredictedResult = {
-
-    val userFeatures = model.userFeatures
-    val productFeatures = model.productFeatures
-
-    // convert whiteList's string ID to integer index
-    val whiteList: Option[Set[Int]] = query.whiteList.map( set =>
-      set.map(model.itemStringIntMap.get(_)).flatten
-    )
-
-    val blackList: Set[String] = query.blackList.getOrElse(Set[String]())
-
-    // if unseenOnly is True, get all seen items
-    val seenItems: Set[String] = if (ap.unseenOnly) {
-
-      // get all user item events which are considered as "seen" events
-      val seenEvents: Iterator[Event] = lEventsDb.findSingleEntity(
-        appId = ap.appId,
-        entityType = "user",
-        entityId = query.user,
-        eventNames = Some(ap.seenEvents),
-        targetEntityType = Some(Some("item")),
-        // set time limit to avoid super long DB access
-        timeout = Duration(200, "millis")
-      ) match {
-        case Right(x) => x
-        case Left(e) => {
-          logger.error(s"Error when read seen events: ${e}")
-          Iterator[Event]()
-        }
-      }
-
-      seenEvents.map { event =>
-        try {
-          event.targetEntityId.get
-        } catch {
-          case e => {
-            logger.error(s"Can't get targetEntityId of event ${event}.")
-            throw e
-          }
-        }
-      }.toSet
-    } else {
-      Set[String]()
-    }
-
-    // get the latest constraint unavailableItems $set event
-    val unavailableItems: Set[String] = lEventsDb.findSingleEntity(
-      appId = ap.appId,
-      entityType = "constraint",
-      entityId = "unavailableItems",
-      eventNames = Some(Seq("$set")),
-      limit = Some(1),
-      latest = true,
-      timeout = Duration(200, "millis")
-    ) match {
-      case Right(x) => {
-        if (x.hasNext) {
-          x.next.properties.get[Set[String]]("items")
-        } else {
-          Set[String]()
-        }
-      }
-      case Left(e) => {
-        logger.error(s"Error when read set unavailableItems event: ${e}")
-        Set[String]()
-      }
-    }
-
-    // combine query's blackList,seenItems and unavailableItems
-    // into final blackList.
-    // convert seen Items list from String ID to interger Index
-    val finalBlackList: Set[Int] = (blackList ++ seenItems ++
-      unavailableItems).map( x => model.itemStringIntMap.get(x)).flatten
-
-    val userFeature =
-      model.userStringIntMap.get(query.user).map { userIndex =>
-        userFeatures.get(userIndex)
-      }
-      // flatten Option[Option[Array[Double]]] to Option[Array[Double]]
-      .flatten
-
-    val topScores = if (userFeature.isDefined) {
-      // the user has feature vector
-      val uf = userFeature.get
-      val indexScores: Map[Int, Double] =
-        productFeatures.par // convert to parallel collection
-          .filter { case (i, (item, feature)) =>
-            feature.isDefined &&
-            isCandidateItem(
-              i = i,
-              item = item,
-              categories = query.categories,
-              whiteList = whiteList,
-              blackList = finalBlackList
-            )
-          }
-          .map { case (i, (item, feature)) =>
-            // NOTE: feature must be defined, so can call .get
-            val s = dotProduct(uf, feature.get)
-            // Can adjust score here
-            (i, s)
-          }
-          .filter(_._2 > 0) // only keep items with score > 0
-          .seq // convert back to sequential collection
-
-      val ord = Ordering.by[(Int, Double), Double](_._2).reverse
-      val topScores = getTopN(indexScores, query.num)(ord).toArray
-
-      topScores
-
-    } else {
-      // the user doesn't have feature vector.
-      // For example, new user is created after model is trained.
-      logger.info(s"No userFeature found for user ${query.user}.")
-      predictNewUser(
-        model = model,
-        query = query,
-        whiteList = whiteList,
-        blackList = finalBlackList
-      )
-
-    }
-
-    val itemScores = topScores.map { case (i, s) =>
-      new ItemScore(
-        // convert item int index back to string ID
-        item = model.itemIntStringMap(i),
-        score = s
-      )
-    }
-
-    new PredictedResult(itemScores)
-  }
-
-  /** Get recently viewed item of the user and return top similar items */
-  private
-  def predictNewUser(
-    model: ALSModel,
-    query: Query,
-    whiteList: Option[Set[Int]],
-    blackList: Set[Int]): Array[(Int, Double)] = {
-
-    val userFeatures = model.userFeatures
-    val productFeatures = model.productFeatures
-
-    // get latest 10 user view item events
-    val recentEvents = lEventsDb.findSingleEntity(
-      appId = ap.appId,
-      // entityType and entityId is specified for fast lookup
-      entityType = "user",
-      entityId = query.user,
-      eventNames = Some(Seq("view")),
-      targetEntityType = Some(Some("item")),
-      limit = Some(10),
-      latest = true,
-      // set time limit to avoid super long DB access
-      timeout = Duration(200, "millis")
-    ) match {
-      case Right(x) => x
-      case Left(e) => {
-        logger.error(s"Error when read recent events: ${e}")
-        Iterator[Event]()
-      }
-    }
-
-    val recentItems: Set[String] = recentEvents.map { event =>
-      try {
-        event.targetEntityId.get
-      } catch {
-        case e => {
-          logger.error("Can't get targetEntityId of event ${event}.")
-          throw e
-        }
-      }
-    }.toSet
-
-    val recentList: Set[Int] = recentItems.map (x =>
-      model.itemStringIntMap.get(x)).flatten
-
-    val recentFeatures: Vector[Array[Double]] = recentList.toVector
-      // productFeatures may not contain the requested item
-      .map { i =>
-        productFeatures.get(i).map { case (item, f) => f }.flatten
-      }.flatten
-
-    val indexScores: Map[Int, Double] = if (recentFeatures.isEmpty) {
-      logger.info(s"No productFeatures vector for recent items ${recentItems}.")
-      Map[Int, Double]()
-    } else {
-      productFeatures.par // convert to parallel collection
-        .filter { case (i, (item, feature)) =>
-          feature.isDefined &&
-          isCandidateItem(
-            i = i,
-            item = item,
-            categories = query.categories,
-            whiteList = whiteList,
-            blackList = blackList
-          )
-        }
-        .map { case (i, (item, feature)) =>
-          val s = recentFeatures.map{ rf =>
-            cosine(rf, feature.get) // feature is defined
-          }.reduce(_ + _)
-          // Can adjust score here
-          (i, s)
-        }
-        .filter(_._2 > 0) // keep items with score > 0
-        .seq // convert back to sequential collection
-    }
-
-    val ord = Ordering.by[(Int, Double), Double](_._2).reverse
-    val topScores = getTopN(indexScores, query.num)(ord).toArray
-
-    topScores
-  }
-
-  private
-  def getTopN[T](s: Iterable[T], n: Int)(implicit ord: Ordering[T]): Seq[T] = {
-
-    val q = PriorityQueue()
-
-    for (x <- s) {
-      if (q.size < n)
-        q.enqueue(x)
-      else {
-        // q is full
-        if (ord.compare(x, q.head) < 0) {
-          q.dequeue()
-          q.enqueue(x)
-        }
-      }
-    }
-
-    q.dequeueAll.toSeq.reverse
-  }
-
-  private
-  def dotProduct(v1: Array[Double], v2: Array[Double]): Double = {
-    val size = v1.size
-    var i = 0
-    var d: Double = 0
-    while (i < size) {
-      d += v1(i) * v2(i)
-      i += 1
-    }
-    d
-  }
-
-  private
-  def cosine(v1: Array[Double], v2: Array[Double]): Double = {
-    val size = v1.size
-    var i = 0
-    var n1: Double = 0
-    var n2: Double = 0
-    var d: Double = 0
-    while (i < size) {
-      n1 += v1(i) * v1(i)
-      n2 += v2(i) * v2(i)
-      d += v1(i) * v2(i)
-      i += 1
-    }
-    val n1n2 = (math.sqrt(n1) * math.sqrt(n2))
-    if (n1n2 == 0) 0 else (d / n1n2)
-  }
-
-  private
-  def isCandidateItem(
-    i: Int,
-    item: Item,
-    categories: Option[Set[String]],
-    whiteList: Option[Set[Int]],
-    blackList: Set[Int]
-  ): Boolean = {
-    // can add other custom filtering here
-    whiteList.map(_.contains(i)).getOrElse(true) &&
-    !blackList.contains(i) &&
-    // filter categories
-    categories.map { cat =>
-      item.categories.map { itemCat =>
-        // keep this item if has ovelap categories with the query
-        !(itemCat.toSet.intersect(cat).isEmpty)
-      }.getOrElse(false) // discard this item if it has no categories
-    }.getOrElse(true)
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/src/main/scala/DataSource.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/src/main/scala/DataSource.scala b/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/src/main/scala/DataSource.scala
index 3600ca5..c0a735c 100644
--- a/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/src/main/scala/DataSource.scala
+++ b/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/src/main/scala/DataSource.scala
@@ -15,14 +15,14 @@
  * limitations under the License.
  */
 
-package org.template.ecommercerecommendation
+package org.apache.predictionio.examples.ecommercerecommendation
 
 import org.apache.predictionio.controller.PDataSource
 import org.apache.predictionio.controller.EmptyEvaluationInfo
 import org.apache.predictionio.controller.EmptyActualResult
 import org.apache.predictionio.controller.Params
 import org.apache.predictionio.data.storage.Event
-import org.apache.predictionio.data.storage.Storage
+import org.apache.predictionio.data.store.PEventStore
 
 import org.apache.spark.SparkContext
 import org.apache.spark.SparkContext._
@@ -30,7 +30,7 @@ import org.apache.spark.rdd.RDD
 
 import grizzled.slf4j.Logger
 
-case class DataSourceParams(appId: Int) extends Params
+case class DataSourceParams(appName: String) extends Params
 
 class DataSource(val dsp: DataSourceParams)
   extends PDataSource[TrainingData,
@@ -40,11 +40,10 @@ class DataSource(val dsp: DataSourceParams)
 
   override
   def readTraining(sc: SparkContext): TrainingData = {
-    val eventsDb = Storage.getPEvents()
 
     // create a RDD of (entityID, User)
-    val usersRDD: RDD[(String, User)] = eventsDb.aggregateProperties(
-      appId = dsp.appId,
+    val usersRDD: RDD[(String, User)] = PEventStore.aggregateProperties(
+      appName = dsp.appName,
       entityType = "user"
     )(sc).map { case (entityId, properties) =>
       val user = try {
@@ -60,8 +59,8 @@ class DataSource(val dsp: DataSourceParams)
     }.cache()
 
     // create a RDD of (entityID, Item)
-    val itemsRDD: RDD[(String, Item)] = eventsDb.aggregateProperties(
-      appId = dsp.appId,
+    val itemsRDD: RDD[(String, Item)] = PEventStore.aggregateProperties(
+      appName = dsp.appName,
       entityType = "item"
     )(sc).map { case (entityId, properties) =>
       val item = try {
@@ -77,38 +76,54 @@ class DataSource(val dsp: DataSourceParams)
       (entityId, item)
     }.cache()
 
-    // get all "user" "rate" "item" events
-    val rateEventsRDD: RDD[RateEvent] = eventsDb.find( // MODIFIED
-      appId = dsp.appId,
+    val eventsRDD: RDD[Event] = PEventStore.find(
+      appName = dsp.appName,
       entityType = Some("user"),
-      eventNames = Some(List("rate")), // MODIFIED
+      eventNames = Some(List("rate", "buy")), // MODIFIED
       // targetEntityType is optional field of an event.
       targetEntityType = Some(Some("item")))(sc)
-      // eventsDb.find() returns RDD[Event]
+      .cache()
+
+    val rateEventsRDD: RDD[RateEvent] = eventsRDD // MODIFIED
+      .filter { event => event.event == "rate" } // MODIFIED
       .map { event =>
-        val rateEvent = try {
-          event.event match {
-            case "rate" => RateEvent( // MODIFIED
-              user = event.entityId,
-              item = event.targetEntityId.get,
-              rating = event.properties.get[Double]("rating"), // ADDED
-              t = event.eventTime.getMillis)
-            case _ => throw new Exception(s"Unexpected event ${event} is read.")
-          }
+        try {
+          RateEvent( // MODIFIED
+            user = event.entityId,
+            item = event.targetEntityId.get,
+            rating = event.properties.get[Double]("rating"), // ADDED
+            t = event.eventTime.getMillis
+          )
         } catch {
-          case e: Exception => {
+          case e: Exception =>
             logger.error(s"Cannot convert ${event} to RateEvent." + // MODIFIED
               s" Exception: ${e}.")
             throw e
-          }
         }
-        rateEvent
-      }.cache()
+      }
+
+    val buyEventsRDD: RDD[BuyEvent] = eventsRDD
+      .filter { event => event.event == "buy" }
+      .map { event =>
+        try {
+          BuyEvent(
+            user = event.entityId,
+            item = event.targetEntityId.get,
+            t = event.eventTime.getMillis
+          )
+        } catch {
+          case e: Exception =>
+            logger.error(s"Cannot convert ${event} to BuyEvent." +
+              s" Exception: ${e}.")
+            throw e
+        }
+      }
 
     new TrainingData(
       users = usersRDD,
       items = itemsRDD,
-      rateEvents = rateEventsRDD // MODIFIED
+      rateEvents = rateEventsRDD, // MODIFIED
+      buyEvents = buyEventsRDD
     )
   }
 }
@@ -120,15 +135,19 @@ case class Item(categories: Option[List[String]])
 // MODIFIED
 case class RateEvent(user: String, item: String, rating: Double, t: Long)
 
+case class BuyEvent(user: String, item: String, t: Long)
+
 class TrainingData(
   val users: RDD[(String, User)],
   val items: RDD[(String, Item)],
-  val rateEvents: RDD[RateEvent] // MODIFIED
+  val rateEvents: RDD[RateEvent], // MODIFIED
+  val buyEvents: RDD[BuyEvent]
 ) extends Serializable {
   override def toString = {
     s"users: [${users.count()} (${users.take(2).toList}...)]" +
     s"items: [${items.count()} (${items.take(2).toList}...)]" +
     // MODIFIED
-    s"rateEvents: [${rateEvents.count()}] (${rateEvents.take(2).toList}...)"
+    s"rateEvents: [${rateEvents.count()}] (${rateEvents.take(2).toList}...)" +
+    s"buyEvents: [${buyEvents.count()}] (${buyEvents.take(2).toList}...)"
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/src/main/scala/ECommAlgorithm.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/src/main/scala/ECommAlgorithm.scala b/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/src/main/scala/ECommAlgorithm.scala
new file mode 100644
index 0000000..597bb50
--- /dev/null
+++ b/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/src/main/scala/ECommAlgorithm.scala
@@ -0,0 +1,597 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.ecommercerecommendation
+
+import org.apache.predictionio.controller.P2LAlgorithm
+import org.apache.predictionio.controller.Params
+import org.apache.predictionio.data.storage.BiMap
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.store.LEventStore
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.recommendation.ALS
+import org.apache.spark.mllib.recommendation.{Rating => MLlibRating}
+import org.apache.spark.rdd.RDD
+
+import grizzled.slf4j.Logger
+
+import scala.collection.mutable.PriorityQueue
+import scala.concurrent.duration.Duration
+import scala.concurrent.ExecutionContext.Implicits.global
+
+case class ECommAlgorithmParams(
+  appName: String,
+  unseenOnly: Boolean,
+  seenEvents: List[String],
+  similarEvents: List[String],
+  rank: Int,
+  numIterations: Int,
+  lambda: Double,
+  seed: Option[Long]
+) extends Params
+
+
+case class ProductModel(
+  item: Item,
+  features: Option[Array[Double]], // features by ALS
+  count: Int // popular count for default score
+)
+
+class ECommModel(
+  val rank: Int,
+  val userFeatures: Map[Int, Array[Double]],
+  val productModels: Map[Int, ProductModel],
+  val userStringIntMap: BiMap[String, Int],
+  val itemStringIntMap: BiMap[String, Int]
+) extends Serializable {
+
+  @transient lazy val itemIntStringMap = itemStringIntMap.inverse
+
+  override def toString = {
+    s" rank: ${rank}" +
+    s" userFeatures: [${userFeatures.size}]" +
+    s"(${userFeatures.take(2).toList}...)" +
+    s" productModels: [${productModels.size}]" +
+    s"(${productModels.take(2).toList}...)" +
+    s" userStringIntMap: [${userStringIntMap.size}]" +
+    s"(${userStringIntMap.take(2).toString}...)]" +
+    s" itemStringIntMap: [${itemStringIntMap.size}]" +
+    s"(${itemStringIntMap.take(2).toString}...)]"
+  }
+}
+
+class ECommAlgorithm(val ap: ECommAlgorithmParams)
+  extends P2LAlgorithm[PreparedData, ECommModel, Query, PredictedResult] {
+
+  @transient lazy val logger = Logger[this.type]
+
+  def train(sc: SparkContext, data: PreparedData): ECommModel = {
+    require(!data.rateEvents.take(1).isEmpty, // MODIFIED
+      s"rateEvents in PreparedData cannot be empty." + // MODIFIED
+      " Please check if DataSource generates TrainingData" +
+      " and Preprator generates PreparedData correctly.")
+    require(!data.users.take(1).isEmpty,
+      s"users in PreparedData cannot be empty." +
+      " Please check if DataSource generates TrainingData" +
+      " and Preprator generates PreparedData correctly.")
+    require(!data.items.take(1).isEmpty,
+      s"items in PreparedData cannot be empty." +
+      " Please check if DataSource generates TrainingData" +
+      " and Preprator generates PreparedData correctly.")
+    // create User and item's String ID to integer index BiMap
+    val userStringIntMap = BiMap.stringInt(data.users.keys)
+    val itemStringIntMap = BiMap.stringInt(data.items.keys)
+
+    val mllibRatings: RDD[MLlibRating] = genMLlibRating(
+      userStringIntMap = userStringIntMap,
+      itemStringIntMap = itemStringIntMap,
+      data = data
+    )
+
+    // MLLib ALS cannot handle empty training data.
+    require(!mllibRatings.take(1).isEmpty,
+      s"mllibRatings cannot be empty." +
+      " Please check if your events contain valid user and item ID.")
+
+    // seed for MLlib ALS
+    val seed = ap.seed.getOrElse(System.nanoTime)
+
+    // use ALS to train feature vectors
+    val m = ALS.train( // MODIFIED
+      ratings = mllibRatings,
+      rank = ap.rank,
+      iterations = ap.numIterations,
+      lambda = ap.lambda,
+      blocks = -1,
+      seed = seed)
+
+    val userFeatures = m.userFeatures.collectAsMap.toMap
+
+    // convert ID to Int index
+    val items = data.items.map { case (id, item) =>
+      (itemStringIntMap(id), item)
+    }
+
+    // join item with the trained productFeatures
+    val productFeatures: Map[Int, (Item, Option[Array[Double]])] =
+      items.leftOuterJoin(m.productFeatures).collectAsMap.toMap
+
+    val popularCount = trainDefault(
+      userStringIntMap = userStringIntMap,
+      itemStringIntMap = itemStringIntMap,
+      data = data
+    )
+
+    val productModels: Map[Int, ProductModel] = productFeatures
+      .map { case (index, (item, features)) =>
+        val pm = ProductModel(
+          item = item,
+          features = features,
+          // NOTE: use getOrElse because popularCount may not contain all items.
+          count = popularCount.getOrElse(index, 0)
+        )
+        (index, pm)
+      }
+
+    new ECommModel(
+      rank = m.rank,
+      userFeatures = userFeatures,
+      productModels = productModels,
+      userStringIntMap = userStringIntMap,
+      itemStringIntMap = itemStringIntMap
+    )
+  }
+
+  /** Generate MLlibRating from PreparedData.
+    * You may customize this function if use different events or different aggregation method
+    */
+  def genMLlibRating(
+    userStringIntMap: BiMap[String, Int],
+    itemStringIntMap: BiMap[String, Int],
+    data: PreparedData): RDD[MLlibRating] = {
+
+    val mllibRatings = data.rateEvents // MODIFIED
+      .map { r =>
+        // Convert user and item String IDs to Int index for MLlib
+        val uindex = userStringIntMap.getOrElse(r.user, -1)
+        val iindex = itemStringIntMap.getOrElse(r.item, -1)
+
+        if (uindex == -1)
+          logger.info(s"Couldn't convert nonexistent user ID ${r.user}"
+            + " to Int index.")
+
+        if (iindex == -1)
+          logger.info(s"Couldn't convert nonexistent item ID ${r.item}"
+            + " to Int index.")
+
+        ((uindex, iindex), (r.rating, r.t)) // MODIFIED
+      }
+      .filter { case ((u, i), v) =>
+        // keep events with valid user and item index
+        (u != -1) && (i != -1)
+      }
+      .reduceByKey { case (v1, v2) => // MODIFIED
+        // if a user may rate same item with different value at different times,
+        // use the latest value for this case.
+        // Can remove this reduceByKey() if no need to support this case.
+        val (rating1, t1) = v1
+        val (rating2, t2) = v2
+        // keep the latest value
+        if (t1 > t2) v1 else v2
+      }
+      .map { case ((u, i), (rating, t)) => // MODIFIED
+        // MLlibRating requires integer index for user and item
+        MLlibRating(u, i, rating) // MODIFIED
+      }
+      .cache()
+
+    mllibRatings
+  }
+
+  /** Train default model.
+    * You may customize this function if use different events or
+    * need different ways to count "popular" score or return default score for item.
+    */
+  def trainDefault(
+    userStringIntMap: BiMap[String, Int],
+    itemStringIntMap: BiMap[String, Int],
+    data: PreparedData): Map[Int, Int] = {
+    // count number of buys
+    // (item index, count)
+    val buyCountsRDD: RDD[(Int, Int)] = data.buyEvents
+      .map { r =>
+        // Convert user and item String IDs to Int index
+        val uindex = userStringIntMap.getOrElse(r.user, -1)
+        val iindex = itemStringIntMap.getOrElse(r.item, -1)
+
+        if (uindex == -1)
+          logger.info(s"Couldn't convert nonexistent user ID ${r.user}"
+            + " to Int index.")
+
+        if (iindex == -1)
+          logger.info(s"Couldn't convert nonexistent item ID ${r.item}"
+            + " to Int index.")
+
+        (uindex, iindex, 1)
+      }
+      .filter { case (u, i, v) =>
+        // keep events with valid user and item index
+        (u != -1) && (i != -1)
+      }
+      .map { case (u, i, v) => (i, 1) } // key is item
+      .reduceByKey{ case (a, b) => a + b } // count number of items occurrence
+
+    buyCountsRDD.collectAsMap.toMap
+  }
+
+  def predict(model: ECommModel, query: Query): PredictedResult = {
+
+    val userFeatures = model.userFeatures
+    val productModels = model.productModels
+
+    // convert whiteList's string ID to integer index
+    val whiteList: Option[Set[Int]] = query.whiteList.map( set =>
+      set.flatMap(model.itemStringIntMap.get(_))
+    )
+
+    val finalBlackList: Set[Int] = genBlackList(query = query)
+      // convert seen Items list from String ID to interger Index
+      .flatMap(x => model.itemStringIntMap.get(x))
+
+    val userFeature: Option[Array[Double]] =
+      model.userStringIntMap.get(query.user).flatMap { userIndex =>
+        userFeatures.get(userIndex)
+      }
+
+    val topScores: Array[(Int, Double)] = if (userFeature.isDefined) {
+      // the user has feature vector
+      predictKnownUser(
+        userFeature = userFeature.get,
+        productModels = productModels,
+        query = query,
+        whiteList = whiteList,
+        blackList = finalBlackList
+      )
+    } else {
+      // the user doesn't have feature vector.
+      // For example, new user is created after model is trained.
+      logger.info(s"No userFeature found for user ${query.user}.")
+
+      // check if the user has recent events on some items
+      val recentItems: Set[String] = getRecentItems(query)
+      val recentList: Set[Int] = recentItems.flatMap (x =>
+        model.itemStringIntMap.get(x))
+
+      val recentFeatures: Vector[Array[Double]] = recentList.toVector
+        // productModels may not contain the requested item
+        .map { i =>
+          productModels.get(i).flatMap { pm => pm.features }
+        }.flatten
+
+      if (recentFeatures.isEmpty) {
+        logger.info(s"No features vector for recent items ${recentItems}.")
+        predictDefault(
+          productModels = productModels,
+          query = query,
+          whiteList = whiteList,
+          blackList = finalBlackList
+        )
+      } else {
+        predictSimilar(
+          recentFeatures = recentFeatures,
+          productModels = productModels,
+          query = query,
+          whiteList = whiteList,
+          blackList = finalBlackList
+        )
+      }
+    }
+
+    val itemScores = topScores.map { case (i, s) =>
+      new ItemScore(
+        // convert item int index back to string ID
+        item = model.itemIntStringMap(i),
+        score = s
+      )
+    }
+
+    new PredictedResult(itemScores)
+  }
+
+  /** Generate final blackList based on other constraints */
+  def genBlackList(query: Query): Set[String] = {
+    // if unseenOnly is True, get all seen items
+    val seenItems: Set[String] = if (ap.unseenOnly) {
+
+      // get all user item events which are considered as "seen" events
+      val seenEvents: Iterator[Event] = try {
+        LEventStore.findByEntity(
+          appName = ap.appName,
+          entityType = "user",
+          entityId = query.user,
+          eventNames = Some(ap.seenEvents),
+          targetEntityType = Some(Some("item")),
+          // set time limit to avoid super long DB access
+          timeout = Duration(200, "millis")
+        )
+      } catch {
+        case e: scala.concurrent.TimeoutException =>
+          logger.error(s"Timeout when read seen events." +
+            s" Empty list is used. ${e}")
+          Iterator[Event]()
+        case e: Exception =>
+          logger.error(s"Error when read seen events: ${e}")
+          throw e
+      }
+
+      seenEvents.map { event =>
+        try {
+          event.targetEntityId.get
+        } catch {
+          case e: Exception => {
+            logger.error(s"Can't get targetEntityId of event ${event}.")
+            throw e
+          }
+        }
+      }.toSet
+    } else {
+      Set[String]()
+    }
+
+    // get the latest constraint unavailableItems $set event
+    val unavailableItems: Set[String] = try {
+      val constr = LEventStore.findByEntity(
+        appName = ap.appName,
+        entityType = "constraint",
+        entityId = "unavailableItems",
+        eventNames = Some(Seq("$set")),
+        limit = Some(1),
+        latest = true,
+        timeout = Duration(200, "millis")
+      )
+      if (constr.hasNext) {
+        constr.next.properties.get[Set[String]]("items")
+      } else {
+        Set[String]()
+      }
+    } catch {
+      case e: scala.concurrent.TimeoutException =>
+        logger.error(s"Timeout when read set unavailableItems event." +
+          s" Empty list is used. ${e}")
+        Set[String]()
+      case e: Exception =>
+        logger.error(s"Error when read set unavailableItems event: ${e}")
+        throw e
+    }
+
+    // combine query's blackList,seenItems and unavailableItems
+    // into final blackList.
+    query.blackList.getOrElse(Set[String]()) ++ seenItems ++ unavailableItems
+  }
+
+  /** Get recent events of the user on items for recommending similar items */
+  def getRecentItems(query: Query): Set[String] = {
+    // get latest 10 user view item events
+    val recentEvents = try {
+      LEventStore.findByEntity(
+        appName = ap.appName,
+        // entityType and entityId is specified for fast lookup
+        entityType = "user",
+        entityId = query.user,
+        eventNames = Some(ap.similarEvents),
+        targetEntityType = Some(Some("item")),
+        limit = Some(10),
+        latest = true,
+        // set time limit to avoid super long DB access
+        timeout = Duration(200, "millis")
+      )
+    } catch {
+      case e: scala.concurrent.TimeoutException =>
+        logger.error(s"Timeout when read recent events." +
+          s" Empty list is used. ${e}")
+        Iterator[Event]()
+      case e: Exception =>
+        logger.error(s"Error when read recent events: ${e}")
+        throw e
+    }
+
+    val recentItems: Set[String] = recentEvents.map { event =>
+      try {
+        event.targetEntityId.get
+      } catch {
+        case e: Exception => {
+          logger.error("Can't get targetEntityId of event ${event}.")
+          throw e
+        }
+      }
+    }.toSet
+
+    recentItems
+  }
+
+  /** Prediction for user with known feature vector */
+  def predictKnownUser(
+    userFeature: Array[Double],
+    productModels: Map[Int, ProductModel],
+    query: Query,
+    whiteList: Option[Set[Int]],
+    blackList: Set[Int]
+  ): Array[(Int, Double)] = {
+    val indexScores: Map[Int, Double] = productModels.par // convert to parallel collection
+      .filter { case (i, pm) =>
+        pm.features.isDefined &&
+        isCandidateItem(
+          i = i,
+          item = pm.item,
+          categories = query.categories,
+          whiteList = whiteList,
+          blackList = blackList
+        )
+      }
+      .map { case (i, pm) =>
+        // NOTE: features must be defined, so can call .get
+        val s = dotProduct(userFeature, pm.features.get)
+        // may customize here to further adjust score
+        (i, s)
+      }
+      .filter(_._2 > 0) // only keep items with score > 0
+      .seq // convert back to sequential collection
+
+    val ord = Ordering.by[(Int, Double), Double](_._2).reverse
+    val topScores = getTopN(indexScores, query.num)(ord).toArray
+
+    topScores
+  }
+
+  /** Default prediction when know nothing about the user */
+  def predictDefault(
+    productModels: Map[Int, ProductModel],
+    query: Query,
+    whiteList: Option[Set[Int]],
+    blackList: Set[Int]
+  ): Array[(Int, Double)] = {
+    val indexScores: Map[Int, Double] = productModels.par // convert back to sequential collection
+      .filter { case (i, pm) =>
+        isCandidateItem(
+          i = i,
+          item = pm.item,
+          categories = query.categories,
+          whiteList = whiteList,
+          blackList = blackList
+        )
+      }
+      .map { case (i, pm) =>
+        // may customize here to further adjust score
+        (i, pm.count.toDouble)
+      }
+      .seq
+
+    val ord = Ordering.by[(Int, Double), Double](_._2).reverse
+    val topScores = getTopN(indexScores, query.num)(ord).toArray
+
+    topScores
+  }
+
+  /** Return top similar items based on items user recently has action on */
+  def predictSimilar(
+    recentFeatures: Vector[Array[Double]],
+    productModels: Map[Int, ProductModel],
+    query: Query,
+    whiteList: Option[Set[Int]],
+    blackList: Set[Int]
+  ): Array[(Int, Double)] = {
+    val indexScores: Map[Int, Double] = productModels.par // convert to parallel collection
+      .filter { case (i, pm) =>
+        pm.features.isDefined &&
+        isCandidateItem(
+          i = i,
+          item = pm.item,
+          categories = query.categories,
+          whiteList = whiteList,
+          blackList = blackList
+        )
+      }
+      .map { case (i, pm) =>
+        val s = recentFeatures.map{ rf =>
+          // pm.features must be defined because of filter logic above
+          cosine(rf, pm.features.get)
+        }.reduce(_ + _)
+        // may customize here to further adjust score
+        (i, s)
+      }
+      .filter(_._2 > 0) // keep items with score > 0
+      .seq // convert back to sequential collection
+
+    val ord = Ordering.by[(Int, Double), Double](_._2).reverse
+    val topScores = getTopN(indexScores, query.num)(ord).toArray
+
+    topScores
+  }
+
+  private
+  def getTopN[T](s: Iterable[T], n: Int)(implicit ord: Ordering[T]): Seq[T] = {
+
+    val q = PriorityQueue()
+
+    for (x <- s) {
+      if (q.size < n)
+        q.enqueue(x)
+      else {
+        // q is full
+        if (ord.compare(x, q.head) < 0) {
+          q.dequeue()
+          q.enqueue(x)
+        }
+      }
+    }
+
+    q.dequeueAll.toSeq.reverse
+  }
+
+  private
+  def dotProduct(v1: Array[Double], v2: Array[Double]): Double = {
+    val size = v1.size
+    var i = 0
+    var d: Double = 0
+    while (i < size) {
+      d += v1(i) * v2(i)
+      i += 1
+    }
+    d
+  }
+
+  private
+  def cosine(v1: Array[Double], v2: Array[Double]): Double = {
+    val size = v1.size
+    var i = 0
+    var n1: Double = 0
+    var n2: Double = 0
+    var d: Double = 0
+    while (i < size) {
+      n1 += v1(i) * v1(i)
+      n2 += v2(i) * v2(i)
+      d += v1(i) * v2(i)
+      i += 1
+    }
+    val n1n2 = (math.sqrt(n1) * math.sqrt(n2))
+    if (n1n2 == 0) 0 else (d / n1n2)
+  }
+
+  private
+  def isCandidateItem(
+    i: Int,
+    item: Item,
+    categories: Option[Set[String]],
+    whiteList: Option[Set[Int]],
+    blackList: Set[Int]
+  ): Boolean = {
+    // can add other custom filtering here
+    whiteList.map(_.contains(i)).getOrElse(true) &&
+    !blackList.contains(i) &&
+    // filter categories
+    categories.map { cat =>
+      item.categories.map { itemCat =>
+        // keep this item if has ovelap categories with the query
+        !(itemCat.toSet.intersect(cat).isEmpty)
+      }.getOrElse(false) // discard this item if it has no categories
+    }.getOrElse(true)
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/src/main/scala/Engine.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/src/main/scala/Engine.scala b/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/src/main/scala/Engine.scala
index 7ec3dd9..1949a98 100644
--- a/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/src/main/scala/Engine.scala
+++ b/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/src/main/scala/Engine.scala
@@ -15,9 +15,9 @@
  * limitations under the License.
  */
 
-package org.template.ecommercerecommendation
+package org.apache.predictionio.examples.ecommercerecommendation
 
-import org.apache.predictionio.controller.IEngineFactory
+import org.apache.predictionio.controller.EngineFactory
 import org.apache.predictionio.controller.Engine
 
 case class Query(
@@ -26,23 +26,23 @@ case class Query(
   categories: Option[Set[String]],
   whiteList: Option[Set[String]],
   blackList: Option[Set[String]]
-)
+) extends Serializable
 
 case class PredictedResult(
   itemScores: Array[ItemScore]
-)
+) extends Serializable
 
 case class ItemScore(
   item: String,
   score: Double
-)
+) extends Serializable
 
-object ECommerceRecommendationEngine extends IEngineFactory {
+object ECommerceRecommendationEngine extends EngineFactory {
   def apply() = {
     new Engine(
       classOf[DataSource],
       classOf[Preparator],
-      Map("als" -> classOf[ALSAlgorithm]),
+      Map("ecomm" -> classOf[ECommAlgorithm]),
       classOf[Serving])
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/src/main/scala/Preparator.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/src/main/scala/Preparator.scala b/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/src/main/scala/Preparator.scala
index 48c7f8d..67f32bb 100644
--- a/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/src/main/scala/Preparator.scala
+++ b/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/src/main/scala/Preparator.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.template.ecommercerecommendation
+package org.apache.predictionio.examples.ecommercerecommendation
 
 import org.apache.predictionio.controller.PPreparator
 
@@ -30,12 +30,14 @@ class Preparator
     new PreparedData(
       users = trainingData.users,
       items = trainingData.items,
-      rateEvents = trainingData.rateEvents) // MODIFIED
+      rateEvents = trainingData.rateEvents, // MODIFIED
+      buyEvents = trainingData.buyEvents)
   }
 }
 
 class PreparedData(
   val users: RDD[(String, User)],
   val items: RDD[(String, Item)],
-  val rateEvents: RDD[RateEvent] // MODIFIED
+  val rateEvents: RDD[RateEvent], // MODIFIED
+  val buyEvents: RDD[BuyEvent]
 ) extends Serializable

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/src/main/scala/Serving.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/src/main/scala/Serving.scala b/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/src/main/scala/Serving.scala
index 32fe959..711f4f3 100644
--- a/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/src/main/scala/Serving.scala
+++ b/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/src/main/scala/Serving.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.template.ecommercerecommendation
+package org.apache.predictionio.examples.ecommercerecommendation
 
 import org.apache.predictionio.controller.LServing
 

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/template.json
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/template.json b/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/template.json
index 932e603..d076ec5 100644
--- a/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/template.json
+++ b/examples/scala-parallel-ecommercerecommendation/train-with-rate-event/template.json
@@ -1 +1 @@
-{"pio": {"version": { "min": "0.9.0" }}}
+{"pio": {"version": { "min": "0.10.0-incubating" }}}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-ecommercerecommendation/weighted-items/.gitignore
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-ecommercerecommendation/weighted-items/.gitignore b/examples/scala-parallel-ecommercerecommendation/weighted-items/.gitignore
deleted file mode 100644
index 57841c6..0000000
--- a/examples/scala-parallel-ecommercerecommendation/weighted-items/.gitignore
+++ /dev/null
@@ -1,4 +0,0 @@
-manifest.json
-target/
-pio.log
-/pio.sbt

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-ecommercerecommendation/weighted-items/README.md
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-ecommercerecommendation/weighted-items/README.md b/examples/scala-parallel-ecommercerecommendation/weighted-items/README.md
deleted file mode 100644
index 4a1f168..0000000
--- a/examples/scala-parallel-ecommercerecommendation/weighted-items/README.md
+++ /dev/null
@@ -1,229 +0,0 @@
-<!--
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
--->
-
-# E-Commerce Recommendation Template With Weighted Items
-
-This engine template is based on the E-Commerce Recommendation Template v0.1.1. It has been modified so that
-each item can be given a different weight.
-
-By default, items have a weight of 1.0. Giving an item a weight greater than
-1.0 will make them appear more often and can be useful for i.e. promoted products. An item can also be given
-a weight smaller than 1.0 (but bigger than 0), in which case it will be recommended less often than originally. Weight
-values smaller than 0.0 are invalid.
-
-## Documentation
-
-Please refer to http://predictionio.incubator.apache.org/templates/ecommercerecommendation/quickstart/
-
-## Development Notes
-
-### Weight constraint event
-
-Item weights are specified by means of a `weightedItems` constraint type event , which includes the weight for all items
-which don't have the default weight of 1.0. At any given time, only the last such `weightedItems` event is taken into
-account.
-
-The event design has been optimized for the use case where there may be many different items that are to be adjusted
-by the same percentage, an so it's based on a list of objects, each containing a list of item IDs and their weight:
-
-```
-{
-  "event" : "$set",
-  "entityType" : "constraint"
-  "entityId" : "weightedItems",
-  "properties" : {
-    weights: [
-      {  
-        "items": [ "i4", "i14"],
-        "weight" : 1.2,
-      },
-      {
-        "items": [ "i11"],
-        "weight" : 1.5,
-      }
-    ]
-  },
-  "eventTime" : "2015-02-17T02:11:21.934Z"
-}
-```
-
-### Changes to ALSAlgorithm.scala
-
-* Added a case class to represent each group items which are given the same weight:
-
-```scala
-// Item weights are defined according to this structure so that groups of items can be easily changed together
-case class WeightsGroup(
-  items: Set[String],
-  weight: Double
-)
-```
-
-* Extract the sequence of `WeightsGroup`s defined in the last `weightedItems` event:
-
-```scala
-    // Get the latest constraint weightedItems. This comes in the form of a sequence of WeightsGroup
-    val groupedWeights = lEventsDb.findSingleEntity(
-      appId = ap.appId,
-      entityType = "constraint",
-      entityId = "weightedItems",
-      eventNames = Some(Seq("$set")),
-      limit = Some(1),
-      latest = true,
-      timeout = 200.millis
-    ) match {
-      case Right(x) =>
-        if (x.hasNext)
-          x.next().properties.get[Seq[WeightsGroup]]("weights")
-        else
-          Seq.empty
-      case Left(e) =>
-        logger.error(s"Error when reading set weightedItems event: ${e}")
-        Seq.empty
-    }
-```
-
-* Transform the sequence of `WeightsGroup`s into a `Map[Int, Double]` that we can easily query to extract the weight
-given to an item, using its `Int` index. For undefined items, their weight is 1.0.
-
-```scala
-    // Transform groupedWeights into a map of index -> weight that we can easily query
-    val weights: Map[Int, Double] = (for {
-      group <- groupedWeights
-      item <- group.items
-      index <- model.itemStringIntMap.get(item)
-    } yield (index, group.weight))
-      .toMap
-      .withDefaultValue(1.0)
-```
-
-* Adjust scores according to item weights:
-
-```scala
-            val originalScore = dotProduct(uf, feature.get)
-            // Adjusting score according to given item weights
-            val adjustedScore = originalScore * weights(i)
-            (i, adjustedScore)
-```
-
-* Pass map of weights to `predictNewUser` function and adjust scores similarly
-
-```scala
-  private
-  def predictNewUser(
-    model: ALSModel,
-    query: Query,
-    whiteList: Option[Set[Int]],
-    blackList: Set[Int],
-    weights: Map[Int, Double]): Array[(Int, Double)] = {
-```
-
-```scala
-          val originalScore = recentFeatures.map { rf =>
-            cosine(rf, feature.get) // feature is defined
-          }.sum
-          // Adjusting score according to given item weights
-          val adjustedScore = originalScore * weights(i)
-          (i, adjustedScore)
-```
-
-```scala
-      predictNewUser(
-        model = model,
-        query = query,
-        whiteList = whiteList,
-        blackList = finalBlackList,
-        weights = weights
-      )
-```
-
-### Setting constraint "weightedItems"
-
-You can set the constraint *weightedItems* by simply sending an event to Event Server.
-
-For example, say, you wanna adjust the score of items "i4", "i14" with a weight of 1.2 and item "i11" with weight of 1.5:
-
-```
-$ curl -i -X POST http://localhost:7070/events.json?accessKey=zPkr6sBwQoBwBjVHK2hsF9u26L38ARSe19QzkdYentuomCtYSuH0vXP5fq7advo4 \
--H "Content-Type: application/json" \
--d '{
-  "event" : "$set",
-  "entityType" : "constraint"
-  "entityId" : "weightedItems",
-  "properties" : {
-    "weights": [
-      {
-        "items": ["i4", "i14"],
-        "weight": 1.2
-      },
-      {
-        "items": ["i11"],
-        "weight": 1.5
-      }
-    ]
-  }
-  "eventTime" : "2015-02-17T02:11:21.934Z"
-}'
-```
-
-Note that only latest set constraint is used (based on eventTime), which means that if you create another new constraint event, the previous constraint won't have any effect anymore. For example, after you send the following event, only scores of items "i2" and "i10" will be adjusted by the weights and previous constraint for items "i4", "i14"," i11" won't be used anymore.
-
-```
-$ curl -i -X POST http://localhost:7070/events.json?accessKey=<ACCESS KEY> \
--H "Content-Type: application/json" \
--d '{
-  "event" : "$set",
-  "entityType" : "constraint"
-  "entityId" : "weightedItems",
-  "properties" : {
-    "weights": [
-      {
-        "items": ["i2", "i10"],
-        "weight": 1.5
-      }
-    ]
-  }
-  "eventTime" : "2015-02-20T04:56:78.123Z"
-}'
-```
-
-To clear the constraint, simply set empty weights array. i.e:
-
-```
-curl -i -X POST http://localhost:7070/events.json?accessKey=<ACCESS KEY> \
--H "Content-Type: application/json" \
--d '{
-  "event" : "$set",
-  "entityType" : "constraint"
-  "entityId" : "weightedItems",
-  "properties" : {
-    "weights": []
-  }
-  "eventTime" : "2015-02-20T04:56:78.123Z"
-}'
-```
-
-
-You can also use SDK to send these events as shown in the sample set_weights.py script.
-
-### set_weights.py script
-
-A `set_weights.py` has been created to add weight to some of the elements. Usage:
-
-```
-$ python data/set_weights.py --access_key <your_access_key>
-```

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-ecommercerecommendation/weighted-items/build.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-ecommercerecommendation/weighted-items/build.sbt b/examples/scala-parallel-ecommercerecommendation/weighted-items/build.sbt
deleted file mode 100644
index 956f79b..0000000
--- a/examples/scala-parallel-ecommercerecommendation/weighted-items/build.sbt
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import AssemblyKeys._
-
-assemblySettings
-
-name := "template-scala-parallel-ecommercerecommendation"
-
-organization := "org.apache.predictionio"
-
-libraryDependencies ++= Seq(
-  "org.apache.predictionio"    %% "core"          % pioVersion.value  % "provided",
-  "org.apache.spark" %% "spark-core"    % "1.2.0" % "provided",
-  "org.apache.spark" %% "spark-mllib"   % "1.2.0" % "provided")

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-ecommercerecommendation/weighted-items/data/import_eventserver.py
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-ecommercerecommendation/weighted-items/data/import_eventserver.py b/examples/scala-parallel-ecommercerecommendation/weighted-items/data/import_eventserver.py
deleted file mode 100644
index 3f97ace..0000000
--- a/examples/scala-parallel-ecommercerecommendation/weighted-items/data/import_eventserver.py
+++ /dev/null
@@ -1,101 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""
-Import sample data for E-Commerce Recommendation Engine Template
-"""
-
-import predictionio
-import argparse
-import random
-
-SEED = 3
-
-def import_events(client):
-  random.seed(SEED)
-  count = 0
-  print client.get_status()
-  print "Importing data..."
-
-  # generate 10 users, with user ids u1,u2,....,u10
-  user_ids = ["u%s" % i for i in range(1, 11)]
-  for user_id in user_ids:
-    print "Set user", user_id
-    client.create_event(
-      event="$set",
-      entity_type="user",
-      entity_id=user_id
-    )
-    count += 1
-
-  # generate 50 items, with item ids i1,i2,....,i50
-  # random assign 1 to 4 categories among c1-c6 to items
-  categories = ["c%s" % i for i in range(1, 7)]
-  item_ids = ["i%s" % i for i in range(1, 51)]
-  for item_id in item_ids:
-    print "Set item", item_id
-    client.create_event(
-      event="$set",
-      entity_type="item",
-      entity_id=item_id,
-      properties={
-        "categories" : random.sample(categories, random.randint(1, 4))
-      }
-    )
-    count += 1
-
-  # each user randomly viewed 10 items
-  for user_id in user_ids:
-    for viewed_item in random.sample(item_ids, 10):
-      print "User", user_id ,"views item", viewed_item
-      client.create_event(
-        event="view",
-        entity_type="user",
-        entity_id=user_id,
-        target_entity_type="item",
-        target_entity_id=viewed_item
-      )
-      count += 1
-      # randomly buy some of the viewed items
-      if random.choice([True, False]):
-        print "User", user_id ,"buys item", viewed_item
-        client.create_event(
-          event="buy",
-          entity_type="user",
-          entity_id=user_id,
-          target_entity_type="item",
-          target_entity_id=viewed_item
-        )
-        count += 1
-
-  print "%s events are imported." % count
-
-if __name__ == '__main__':
-  parser = argparse.ArgumentParser(
-    description="Import sample data for e-commerce recommendation engine")
-  parser.add_argument('--access_key', default='invalid_access_key')
-  parser.add_argument('--url', default="http://localhost:7070")
-
-  args = parser.parse_args()
-  print args
-
-  client = predictionio.EventClient(
-    access_key=args.access_key,
-    url=args.url,
-    threads=5,
-    qsize=500)
-  import_events(client)

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-ecommercerecommendation/weighted-items/data/send_query.py
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-ecommercerecommendation/weighted-items/data/send_query.py b/examples/scala-parallel-ecommercerecommendation/weighted-items/data/send_query.py
deleted file mode 100644
index 21e9146..0000000
--- a/examples/scala-parallel-ecommercerecommendation/weighted-items/data/send_query.py
+++ /dev/null
@@ -1,24 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""
-Send sample query to prediction engine
-"""
-
-import predictionio
-engine_client = predictionio.EngineClient(url="http://localhost:8000")
-print engine_client.send_query({"user": "u1", "num": 4})

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-ecommercerecommendation/weighted-items/data/set_weights.py
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-ecommercerecommendation/weighted-items/data/set_weights.py b/examples/scala-parallel-ecommercerecommendation/weighted-items/data/set_weights.py
deleted file mode 100644
index 47c7969..0000000
--- a/examples/scala-parallel-ecommercerecommendation/weighted-items/data/set_weights.py
+++ /dev/null
@@ -1,56 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""
-Set weights for E-Commerce Recommendation Engine Template items
-"""
-
-import argparse
-import predictionio
-
-def set_weights(client):
-    client.create_event(
-        event="$set",
-        entity_type="constraint",
-        entity_id="weightedItems",
-        properties={
-            "weights": [
-                {
-                    "items": ["i4", "i14"],
-                    "weight": 1.2
-                },
-                {
-                    "items": ["i11"],
-                    "weight": 1.5
-                }
-            ]
-        }
-    )
-
-if __name__ == '__main__':
-    parser = argparse.ArgumentParser(description="Set weights to items")
-    parser.add_argument('--access_key', default='invalid_access_key')
-    parser.add_argument('--url', default="http://localhost:7070")
-
-    args = parser.parse_args()
-
-    client = predictionio.EventClient(
-        access_key=args.access_key,
-        url=args.url,
-        threads=5,
-        qsize=500)
-    set_weights(client)

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-ecommercerecommendation/weighted-items/engine.json
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-ecommercerecommendation/weighted-items/engine.json b/examples/scala-parallel-ecommercerecommendation/weighted-items/engine.json
deleted file mode 100644
index 2a11542..0000000
--- a/examples/scala-parallel-ecommercerecommendation/weighted-items/engine.json
+++ /dev/null
@@ -1,24 +0,0 @@
-{
-  "id": "default",
-  "description": "Default settings",
-  "engineFactory": "org.template.ecommercerecommendation.ECommerceRecommendationEngine",
-  "datasource": {
-    "params" : {
-      "appId": 4
-    }
-  },
-  "algorithms": [
-    {
-      "name": "als",
-      "params": {
-        "appId": 4,
-        "unseenOnly": true,
-        "seenEvents": ["buy", "view"],
-        "rank": 10,
-        "numIterations" : 20,
-        "lambda": 0.01,
-        "seed": 3
-      }
-    }
-  ]
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-ecommercerecommendation/weighted-items/project/assembly.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-ecommercerecommendation/weighted-items/project/assembly.sbt b/examples/scala-parallel-ecommercerecommendation/weighted-items/project/assembly.sbt
deleted file mode 100644
index 54c3252..0000000
--- a/examples/scala-parallel-ecommercerecommendation/weighted-items/project/assembly.sbt
+++ /dev/null
@@ -1 +0,0 @@
-addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-ecommercerecommendation/weighted-items/project/pio-build.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-ecommercerecommendation/weighted-items/project/pio-build.sbt b/examples/scala-parallel-ecommercerecommendation/weighted-items/project/pio-build.sbt
deleted file mode 100644
index 9aed0ee..0000000
--- a/examples/scala-parallel-ecommercerecommendation/weighted-items/project/pio-build.sbt
+++ /dev/null
@@ -1 +0,0 @@
-addSbtPlugin("org.apache.predictionio" % "pio-build" % "0.9.0")

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-ecommercerecommendation/weighted-items/src/main/scala/ALSAlgorithm.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-ecommercerecommendation/weighted-items/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-ecommercerecommendation/weighted-items/src/main/scala/ALSAlgorithm.scala
deleted file mode 100644
index 90d1482..0000000
--- a/examples/scala-parallel-ecommercerecommendation/weighted-items/src/main/scala/ALSAlgorithm.scala
+++ /dev/null
@@ -1,482 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.ecommercerecommendation
-
-import org.apache.predictionio.controller.P2LAlgorithm
-import org.apache.predictionio.controller.Params
-import org.apache.predictionio.data.storage.BiMap
-import org.apache.predictionio.data.storage.Event
-import org.apache.predictionio.data.storage.Storage
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.mllib.recommendation.ALS
-import org.apache.spark.mllib.recommendation.{Rating => MLlibRating}
-
-import grizzled.slf4j.Logger
-
-import scala.collection.mutable.PriorityQueue
-import scala.concurrent.duration._
-import scala.concurrent.ExecutionContext.Implicits.global
-
-case class ALSAlgorithmParams(
-  appId: Int,
-  unseenOnly: Boolean,
-  seenEvents: List[String],
-  rank: Int,
-  numIterations: Int,
-  lambda: Double,
-  seed: Option[Long]
-) extends Params
-
-class ALSModel(
-  val rank: Int,
-  val userFeatures: Map[Int, Array[Double]],
-  val productFeatures: Map[Int, (Item, Option[Array[Double]])],
-  val userStringIntMap: BiMap[String, Int],
-  val itemStringIntMap: BiMap[String, Int]
-) extends Serializable {
-
-  @transient lazy val itemIntStringMap = itemStringIntMap.inverse
-
-  override def toString = {
-    s" rank: ${rank}" +
-    s" userFeatures: [${userFeatures.size}]" +
-    s"(${userFeatures.take(2).toList}...)" +
-    s" productFeatures: [${productFeatures.size}]" +
-    s"(${productFeatures.take(2).toList}...)" +
-    s" userStringIntMap: [${userStringIntMap.size}]" +
-    s"(${userStringIntMap.take(2).toString}...)]" +
-    s" itemStringIntMap: [${itemStringIntMap.size}]" +
-    s"(${itemStringIntMap.take(2).toString}...)]"
-  }
-}
-
-// Item weights are defined according to this structure so that groups of items can be easily changed together
-case class WeightsGroup(
-  items: Set[String],
-  weight: Double
-)
-
-/**
-  * Use ALS to build item x feature matrix
-  */
-class ALSAlgorithm(val ap: ALSAlgorithmParams)
-  extends P2LAlgorithm[PreparedData, ALSModel, Query, PredictedResult] {
-
-  @transient lazy val logger = Logger[this.type]
-  // NOTE: use getLEvents() for local access
-  @transient lazy val lEventsDb = Storage.getLEvents()
-
-  def train(sc: SparkContext, data: PreparedData): ALSModel = {
-    require(!data.viewEvents.take(1).isEmpty,
-      s"viewEvents in PreparedData cannot be empty." +
-      " Please check if DataSource generates TrainingData" +
-      " and Preprator generates PreparedData correctly.")
-    require(!data.users.take(1).isEmpty,
-      s"users in PreparedData cannot be empty." +
-      " Please check if DataSource generates TrainingData" +
-      " and Preprator generates PreparedData correctly.")
-    require(!data.items.take(1).isEmpty,
-      s"items in PreparedData cannot be empty." +
-      " Please check if DataSource generates TrainingData" +
-      " and Preprator generates PreparedData correctly.")
-    // create User and item's String ID to integer index BiMap
-    val userStringIntMap = BiMap.stringInt(data.users.keys)
-    val itemStringIntMap = BiMap.stringInt(data.items.keys)
-
-    val mllibRatings = data.viewEvents
-      .map { r =>
-        // Convert user and item String IDs to Int index for MLlib
-        val uindex = userStringIntMap.getOrElse(r.user, -1)
-        val iindex = itemStringIntMap.getOrElse(r.item, -1)
-
-        if (uindex == -1)
-          logger.info(s"Couldn't convert nonexistent user ID ${r.user}"
-            + " to Int index.")
-
-        if (iindex == -1)
-          logger.info(s"Couldn't convert nonexistent item ID ${r.item}"
-            + " to Int index.")
-
-        ((uindex, iindex), 1)
-      }.filter { case ((u, i), v) =>
-        // keep events with valid user and item index
-        (u != -1) && (i != -1)
-      }
-      .reduceByKey(_ + _) // aggregate all view events of same user-item pair
-      .map { case ((u, i), v) =>
-        // MLlibRating requires integer index for user and item
-        MLlibRating(u, i, v)
-      }.cache()
-
-    // MLLib ALS cannot handle empty training data.
-    require(!mllibRatings.take(1).isEmpty,
-      s"mllibRatings cannot be empty." +
-      " Please check if your events contain valid user and item ID.")
-
-    // seed for MLlib ALS
-    val seed = ap.seed.getOrElse(System.nanoTime)
-
-    val m = ALS.trainImplicit(
-      ratings = mllibRatings,
-      rank = ap.rank,
-      iterations = ap.numIterations,
-      lambda = ap.lambda,
-      blocks = -1,
-      alpha = 1.0,
-      seed = seed)
-
-    val userFeatures = m.userFeatures.collectAsMap.toMap
-
-    // convert ID to Int index
-    val items = data.items.map { case (id, item) =>
-      (itemStringIntMap(id), item)
-    }
-
-    // join item with the trained productFeatures
-    val productFeatures = items.leftOuterJoin(m.productFeatures)
-      .collectAsMap.toMap
-
-    new ALSModel(
-      rank = m.rank,
-      userFeatures = userFeatures,
-      productFeatures = productFeatures,
-      userStringIntMap = userStringIntMap,
-      itemStringIntMap = itemStringIntMap
-    )
-  }
-
-  def predict(model: ALSModel, query: Query): PredictedResult = {
-
-    val userFeatures = model.userFeatures
-    val productFeatures = model.productFeatures
-
-    // convert whiteList's string ID to integer index
-    val whiteList: Option[Set[Int]] = query.whiteList.map( set =>
-      set.map(model.itemStringIntMap.get).flatten
-    )
-
-    val blackList: Set[String] = query.blackList.getOrElse(Set[String]())
-
-    // if unseenOnly is True, get all seen items
-    val seenItems: Set[String] = if (ap.unseenOnly) {
-
-      // get all user item events which are considered as "seen" events
-      val seenEvents: Iterator[Event] = lEventsDb.findSingleEntity(
-        appId = ap.appId,
-        entityType = "user",
-        entityId = query.user,
-        eventNames = Some(ap.seenEvents),
-        targetEntityType = Some(Some("item")),
-        // set time limit to avoid super long DB access
-        timeout = 200.millis
-      ) match {
-        case Right(x) => x
-        case Left(e) => {
-          logger.error(s"Error when read seen events: ${e}")
-          Iterator[Event]()
-        }
-      }
-
-      seenEvents.map { event =>
-        try {
-          event.targetEntityId.get
-        } catch {
-          case e => {
-            logger.error(s"Can't get targetEntityId of event ${event}.")
-            throw e
-          }
-        }
-      }.toSet
-    } else {
-      Set[String]()
-    }
-
-    // get the latest constraint unavailableItems $set event
-    val unavailableItems: Set[String] = lEventsDb.findSingleEntity(
-      appId = ap.appId,
-      entityType = "constraint",
-      entityId = "unavailableItems",
-      eventNames = Some(Seq("$set")),
-      limit = Some(1),
-      latest = true,
-      timeout = 200.millis
-    ) match {
-      case Right(x) => {
-        if (x.hasNext) {
-          x.next.properties.get[Set[String]]("items")
-        } else {
-          Set[String]()
-        }
-      }
-      case Left(e) => {
-        logger.error(s"Error when read set unavailableItems event: ${e}")
-        Set[String]()
-      }
-    }
-
-    // Get the latest constraint weightedItems. This comes in the form of a sequence of WeightsGroup
-    val groupedWeights = lEventsDb.findSingleEntity(
-      appId = ap.appId,
-      entityType = "constraint",
-      entityId = "weightedItems",
-      eventNames = Some(Seq("$set")),
-      limit = Some(1),
-      latest = true,
-      timeout = 200.millis
-    ) match {
-      case Right(x) =>
-        if (x.hasNext)
-          x.next().properties.get[Seq[WeightsGroup]]("weights")
-        else
-          Seq.empty
-      case Left(e) =>
-        logger.error(s"Error when reading set weightedItems event: ${e}")
-        Seq.empty
-    }
-
-    // Transform groupedWeights into a map of index -> weight that we can easily query
-    val weights: Map[Int, Double] = (for {
-      group <- groupedWeights
-      item <- group.items
-      index <- model.itemStringIntMap.get(item)
-    } yield (index, group.weight))
-      .toMap
-      .withDefaultValue(1.0)
-
-    // combine query's blackList,seenItems and unavailableItems
-    // into final blackList.
-    // convert seen Items list from String ID to integer Index
-    val finalBlackList: Set[Int] = (blackList ++ seenItems ++ unavailableItems)
-      .map( x => model.itemStringIntMap.get(x)).flatten
-
-    val userFeature =
-      model.userStringIntMap.get(query.user).map { userIndex =>
-        userFeatures.get(userIndex)
-      }
-      // flatten Option[Option[Array[Double]]] to Option[Array[Double]]
-      .flatten
-
-    val topScores = if (userFeature.isDefined) {
-      // the user has feature vector
-      val uf = userFeature.get
-      val indexScores: Map[Int, Double] =
-        productFeatures.par // convert to parallel collection
-          .filter { case (i, (item, feature)) =>
-            feature.isDefined &&
-            isCandidateItem(
-              i = i,
-              item = item,
-              categories = query.categories,
-              whiteList = whiteList,
-              blackList = finalBlackList
-            )
-          }
-          .map { case (i, (item, feature)) =>
-            // NOTE: feature must be defined, so can call .get
-            val originalScore = dotProduct(uf, feature.get)
-            // Adjusting score according to given item weights
-            val adjustedScore = originalScore * weights(i)
-            (i, adjustedScore)
-          }
-          .filter(_._2 > 0) // only keep items with score > 0
-          .seq // convert back to sequential collection
-
-      val ord = Ordering.by[(Int, Double), Double](_._2).reverse
-      val topScores = getTopN(indexScores, query.num)(ord).toArray
-
-      topScores
-
-    } else {
-      // the user doesn't have feature vector.
-      // For example, new user is created after model is trained.
-      logger.info(s"No userFeature found for user ${query.user}.")
-      predictNewUser(
-        model = model,
-        query = query,
-        whiteList = whiteList,
-        blackList = finalBlackList,
-        weights = weights
-      )
-    }
-
-    val itemScores = topScores.map { case (i, s) =>
-      new ItemScore(
-        // convert item int index back to string ID
-        item = model.itemIntStringMap(i),
-        score = s
-      )
-    }
-
-    new PredictedResult(itemScores)
-  }
-
-  /** Get recently viewed item of the user and return top similar items */
-  private
-  def predictNewUser(
-    model: ALSModel,
-    query: Query,
-    whiteList: Option[Set[Int]],
-    blackList: Set[Int],
-    weights: Map[Int, Double]): Array[(Int, Double)] = {
-
-    val productFeatures = model.productFeatures
-
-    // get latest 10 user view item events
-    val recentEvents = lEventsDb.findSingleEntity(
-      appId = ap.appId,
-      // entityType and entityId is specified for fast lookup
-      entityType = "user",
-      entityId = query.user,
-      eventNames = Some(Seq("view")),
-      targetEntityType = Some(Some("item")),
-      limit = Some(10),
-      latest = true,
-      // set time limit to avoid super long DB access
-      timeout = Duration(200, "millis")
-    ) match {
-      case Right(x) => x
-      case Left(e) => {
-        logger.error(s"Error when read recent events: ${e}")
-        Iterator[Event]()
-      }
-    }
-
-    val recentItems: Set[String] = recentEvents.map { event =>
-      try {
-        event.targetEntityId.get
-      } catch {
-        case e => {
-          logger.error("Can't get targetEntityId of event ${event}.")
-          throw e
-        }
-      }
-    }.toSet
-
-    val recentList: Set[Int] = recentItems.map (x =>
-      model.itemStringIntMap.get(x)).flatten
-
-    val recentFeatures: Vector[Array[Double]] = recentList.toVector
-      // productFeatures may not contain the requested item
-      .map { i =>
-        productFeatures.get(i).map { case (item, f) => f }.flatten
-      }.flatten
-
-    val indexScores: Map[Int, Double] = if (recentFeatures.isEmpty) {
-      logger.info(s"No productFeatures vector for recent items ${recentItems}.")
-      Map[Int, Double]()
-    } else {
-      productFeatures.par // convert to parallel collection
-        .filter { case (i, (item, feature)) =>
-          feature.isDefined &&
-          isCandidateItem(
-            i = i,
-            item = item,
-            categories = query.categories,
-            whiteList = whiteList,
-            blackList = blackList
-          )
-        }
-        .map { case (i, (item, feature)) =>
-          val originalScore = recentFeatures.map { rf =>
-            cosine(rf, feature.get) // feature is defined
-          }.sum
-          // Adjusting score according to given item weights
-          val adjustedScore = originalScore * weights(i)
-          (i, adjustedScore)
-        }
-        .filter(_._2 > 0) // keep items with score > 0
-        .seq // convert back to sequential collection
-    }
-
-    val ord = Ordering.by[(Int, Double), Double](_._2).reverse
-    val topScores = getTopN(indexScores, query.num)(ord).toArray
-
-    topScores
-  }
-
-  private
-  def getTopN[T](s: Iterable[T], n: Int)(implicit ord: Ordering[T]): Seq[T] = {
-
-    val q = PriorityQueue()
-
-    for (x <- s) {
-      if (q.size < n)
-        q.enqueue(x)
-      else {
-        // q is full
-        if (ord.compare(x, q.head) < 0) {
-          q.dequeue()
-          q.enqueue(x)
-        }
-      }
-    }
-
-    q.dequeueAll.toSeq.reverse
-  }
-
-  private
-  def dotProduct(v1: Array[Double], v2: Array[Double]): Double = {
-    val size = v1.length
-    var i = 0
-    var d: Double = 0
-    while (i < size) {
-      d += v1(i) * v2(i)
-      i += 1
-    }
-    d
-  }
-
-  private
-  def cosine(v1: Array[Double], v2: Array[Double]): Double = {
-    val size = v1.length
-    var i = 0
-    var n1: Double = 0
-    var n2: Double = 0
-    var d: Double = 0
-    while (i < size) {
-      n1 += v1(i) * v1(i)
-      n2 += v2(i) * v2(i)
-      d += v1(i) * v2(i)
-      i += 1
-    }
-    val n1n2 = math.sqrt(n1) * math.sqrt(n2)
-    if (n1n2 == 0) 0 else d / n1n2
-  }
-
-  private
-  def isCandidateItem(
-    i: Int,
-    item: Item,
-    categories: Option[Set[String]],
-    whiteList: Option[Set[Int]],
-    blackList: Set[Int]
-  ): Boolean = {
-    // can add other custom filtering here
-    whiteList.map(_.contains(i)).getOrElse(true) &&
-    !blackList.contains(i) &&
-    // filter categories
-    categories.map { cat =>
-      item.categories.exists { itemCat =>
-        // keep this item if its categories overlap with the query categories
-        itemCat.toSet.intersect(cat).nonEmpty
-      } // discard this item if it has no categories
-    }.getOrElse(true)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-ecommercerecommendation/weighted-items/src/main/scala/DataSource.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-ecommercerecommendation/weighted-items/src/main/scala/DataSource.scala b/examples/scala-parallel-ecommercerecommendation/weighted-items/src/main/scala/DataSource.scala
deleted file mode 100644
index 8c24813..0000000
--- a/examples/scala-parallel-ecommercerecommendation/weighted-items/src/main/scala/DataSource.scala
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.ecommercerecommendation
-
-import org.apache.predictionio.controller.PDataSource
-import org.apache.predictionio.controller.EmptyEvaluationInfo
-import org.apache.predictionio.controller.EmptyActualResult
-import org.apache.predictionio.controller.Params
-import org.apache.predictionio.data.storage.Event
-import org.apache.predictionio.data.storage.Storage
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-import grizzled.slf4j.Logger
-
-case class DataSourceParams(appId: Int) extends Params
-
-class DataSource(val dsp: DataSourceParams)
-  extends PDataSource[TrainingData,
-      EmptyEvaluationInfo, Query, EmptyActualResult] {
-
-  @transient lazy val logger = Logger[this.type]
-
-  override
-  def readTraining(sc: SparkContext): TrainingData = {
-    val eventsDb = Storage.getPEvents()
-
-    // create a RDD of (entityID, User)
-    val usersRDD: RDD[(String, User)] = eventsDb.aggregateProperties(
-      appId = dsp.appId,
-      entityType = "user"
-    )(sc).map { case (entityId, properties) =>
-      val user = try {
-        User()
-      } catch {
-        case e: Exception => {
-          logger.error(s"Failed to get properties ${properties} of" +
-            s" user ${entityId}. Exception: ${e}.")
-          throw e
-        }
-      }
-      (entityId, user)
-    }.cache()
-
-    // create a RDD of (entityID, Item)
-    val itemsRDD: RDD[(String, Item)] = eventsDb.aggregateProperties(
-      appId = dsp.appId,
-      entityType = "item"
-    )(sc).map { case (entityId, properties) =>
-      val item = try {
-        // Assume categories is optional property of item.
-        Item(categories = properties.getOpt[List[String]]("categories"))
-      } catch {
-        case e: Exception => {
-          logger.error(s"Failed to get properties ${properties} of" +
-            s" item ${entityId}. Exception: ${e}.")
-          throw e
-        }
-      }
-      (entityId, item)
-    }.cache()
-
-    // get all "user" "view" "item" events
-    val viewEventsRDD: RDD[ViewEvent] = eventsDb.find(
-      appId = dsp.appId,
-      entityType = Some("user"),
-      eventNames = Some(List("view")),
-      // targetEntityType is optional field of an event.
-      targetEntityType = Some(Some("item")))(sc)
-      // eventsDb.find() returns RDD[Event]
-      .map { event =>
-        val viewEvent = try {
-          event.event match {
-            case "view" => ViewEvent(
-              user = event.entityId,
-              item = event.targetEntityId.get,
-              t = event.eventTime.getMillis)
-            case _ => throw new Exception(s"Unexpected event ${event} is read.")
-          }
-        } catch {
-          case e: Exception => {
-            logger.error(s"Cannot convert ${event} to ViewEvent." +
-              s" Exception: ${e}.")
-            throw e
-          }
-        }
-        viewEvent
-      }.cache()
-
-    new TrainingData(
-      users = usersRDD,
-      items = itemsRDD,
-      viewEvents = viewEventsRDD
-    )
-  }
-}
-
-case class User()
-
-case class Item(categories: Option[List[String]])
-
-case class ViewEvent(user: String, item: String, t: Long)
-
-class TrainingData(
-  val users: RDD[(String, User)],
-  val items: RDD[(String, Item)],
-  val viewEvents: RDD[ViewEvent]
-) extends Serializable {
-  override def toString = {
-    s"users: [${users.count()} (${users.take(2).toList}...)]" +
-    s"items: [${items.count()} (${items.take(2).toList}...)]" +
-    s"viewEvents: [${viewEvents.count()}] (${viewEvents.take(2).toList}...)"
-  }
-}