You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by ch...@apache.org on 2017/09/28 15:54:48 UTC
[07/57] [abbrv] incubator-predictionio git commit: [PIO-97] Fixes
examples of the official templates for v0.11.0-incubating.
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-query/src/main/scala/ALSAlgorithm.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/custom-query/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-recommendation/custom-query/src/main/scala/ALSAlgorithm.scala
deleted file mode 100644
index 917153b..0000000
--- a/examples/scala-parallel-recommendation/custom-query/src/main/scala/ALSAlgorithm.scala
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.recommendation
-
-import org.apache.predictionio.controller.PAlgorithm
-import org.apache.predictionio.controller.Params
-import org.apache.predictionio.data.storage.BiMap
-
-import org.apache.spark.SparkContext._
-import org.apache.spark.mllib.recommendation.ALS
-import org.apache.spark.mllib.recommendation.{Rating => MLlibRating}
-
-import grizzled.slf4j.Logger
-
-case class ALSAlgorithmParams(rank: Int, numIterations: Int, lambda: Double,
- seed: Option[Long]) extends Params
-
-/**
- * Use ALS to build item x feature matrix
- */
-class ALSAlgorithm(val ap: ALSAlgorithmParams)
- extends PAlgorithm[PreparedData, ALSModel, Query, PredictedResult] {
-
- @transient lazy val logger = Logger[this.type]
-
- def train(data: PreparedData): ALSModel = {
- require(!data.ratings.take(1).isEmpty,
- s"viewEvents in PreparedData cannot be empty." +
- " Please check if DataSource generates TrainingData" +
- " and Preprator generates PreparedData correctly.")
- require(!data.items.take(1).isEmpty,
- s"items in PreparedData cannot be empty." +
- " Please check if DataSource generates TrainingData" +
- " and Preprator generates PreparedData correctly.")
- // create item's String ID to integer index BiMap
- val itemStringIntMap = BiMap.stringInt(data.items.keys)
- val userStringIntMap = BiMap.stringInt(data.ratings.map(_.user))
-
- // HOWTO: collect Item as Map and convert ID to Int index
- val items: Map[Int, Item] = data.items.map { case (id, item) ⇒
- (itemStringIntMap(id), item)
- }.collectAsMap.toMap
-
- val mllibRatings = data.ratings.map { r =>
- // Convert user and item String IDs to Int index for MLlib
- val iindex = itemStringIntMap.getOrElse(r.item, -1)
- val uindex = userStringIntMap.getOrElse(r.user, -1)
-
- if (iindex == -1)
- logger.info(s"Couldn't convert nonexistent item ID ${r.item}"
- + " to Int index.")
-
- (uindex -> iindex) -> 1
- }.filter { case ((u, i), v) => (i != -1) && (u != -1) }
- .reduceByKey(_ + _) // aggregate all view events of same item
- .map { case ((u, i), v) => MLlibRating(u, i, v) }
-
- // MLLib ALS cannot handle empty training data.
- require(!mllibRatings.take(1).isEmpty,
- s"mllibRatings cannot be empty." +
- " Please check if your events contain valid user and item ID.")
-
- // seed for MLlib ALS
- val seed = ap.seed.getOrElse(System.nanoTime)
-
- val m = ALS.trainImplicit(
- ratings = mllibRatings,
- rank = ap.rank,
- iterations = ap.numIterations,
- lambda = ap.lambda,
- blocks = -1,
- alpha = 1.0,
- seed = seed)
-
- new ALSModel(productFeatures = m.productFeatures,
- itemStringIntMap = itemStringIntMap, items = items)
- }
-
- def predict(model: ALSModel, query: Query): PredictedResult = {
- val queryFeatures =
- model.items.keys.flatMap(model.productFeatures.lookup(_).headOption)
-
- val indexScores = if (queryFeatures.isEmpty) {
- logger.info(s"No productFeatures found for query ${query}.")
- Array[(Int, Double)]()
- } else {
- model.productFeatures.mapValues { f ⇒
- queryFeatures.map(cosine(_, f)).reduce(_ + _)
- }.filter(_._2 > 0) // keep items with score > 0
- .collect()
- }
-
- // HOWTO: filter predicted results by query.
- val filteredScores = filterItems(indexScores, model.items, query)
-
- implicit val ord = Ordering.by[(Int, Double), Double](_._2)
- val topScores = getTopN(filteredScores, query.num).toArray
-
- val itemScores = topScores.map { case (i, s) ⇒
- new ItemScore(item = model.itemIntStringMap(i), score = s,
- creationYear = model.items(i).creationYear)
- }
-
- new PredictedResult(itemScores)
- }
-
- private def getTopN[T](s: Seq[T], n: Int)
- (implicit ord: Ordering[T]): Iterable[T] = {
-
- var result = List.empty[T]
-
- for (x <- s) {
- if (result.size < n)
- result = x :: result
- else {
- val min = result.min
- if (ord.compare(x, min) < 0) {
- result = x :: result.filter(_ != min)
- }
- }
- }
-
- result.sorted.reverse
- }
-
- private def cosine(v1: Array[Double], v2: Array[Double]): Double = {
- val size = v1.size
- var i = 0
- var n1: Double = 0
- var n2: Double = 0
- var d: Double = 0
- while (i < size) {
- n1 += v1(i) * v1(i)
- n2 += v2(i) * v2(i)
- d += v1(i) * v2(i)
- i += 1
- }
- val n1n2 = (math.sqrt(n1) * math.sqrt(n2))
- if (n1n2 == 0) 0 else (d / n1n2)
- }
-
- // HOWTO: actual filter of predicted movie results.
- // filter selects all movies
- // that were made after the year specified in the query
- private def filterItems(selectedScores: Array[(Int, Double)],
- items: Map[Int, Item],
- query: Query) =
- selectedScores.view.filter { case (iId, _) ⇒
- items(iId).creationYear.map(icr ⇒ query.creationYear.forall(icr >= _))
- .getOrElse(true)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-query/src/main/scala/ALSModel.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/custom-query/src/main/scala/ALSModel.scala b/examples/scala-parallel-recommendation/custom-query/src/main/scala/ALSModel.scala
deleted file mode 100644
index d463204..0000000
--- a/examples/scala-parallel-recommendation/custom-query/src/main/scala/ALSModel.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.recommendation
-
-import org.apache.predictionio.controller.IPersistentModel
-import org.apache.predictionio.controller.IPersistentModelLoader
-import org.apache.predictionio.data.storage.BiMap
-
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
-
-class ALSModel(
- val productFeatures: RDD[(Int, Array[Double])],
- val itemStringIntMap: BiMap[String, Int],
- // HOWTO: added a map of `generatedItemIntId -> Item` to the algo data model.
- val items: Map[Int, Item])
- extends IPersistentModel[ALSAlgorithmParams] with Serializable {
-
- @transient lazy val itemIntStringMap = itemStringIntMap.inverse
-
- def save(id: String, params: ALSAlgorithmParams,
- sc: SparkContext): Boolean = {
-
- productFeatures.saveAsObjectFile(s"/tmp/${id}/productFeatures")
- sc.parallelize(Seq(itemStringIntMap))
- .saveAsObjectFile(s"/tmp/${id}/itemStringIntMap")
- // HOWTO: save items too as part of algo model
- sc.parallelize(Seq(items))
- .saveAsObjectFile(s"/tmp/${id}/items")
- true
- }
-
- override def toString = {
- s" productFeatures: [${productFeatures.count()}]" +
- s"(${productFeatures.take(2).toList}...)" +
- s" itemStringIntMap: [${itemStringIntMap.size}]" +
- s"(${itemStringIntMap.take(2).toString}...)]" +
- s" items: [${items.size}]" +
- s"(${items.take(2).toString}...)]"
- }
-}
-
-object ALSModel extends IPersistentModelLoader[ALSAlgorithmParams, ALSModel] {
- def apply(id: String, params: ALSAlgorithmParams, sc: Option[SparkContext]) =
- new ALSModel(
- productFeatures = sc.get.objectFile(s"/tmp/${id}/productFeatures"),
- itemStringIntMap = sc.get
- .objectFile[BiMap[String, Int]](s"/tmp/${id}/itemStringIntMap").first,
- // HOWTO: read items too as part of algo model
- items = sc.get
- .objectFile[Map[Int, Item]](s"/tmp/${id}/items").first)
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-query/src/main/scala/DataSource.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/custom-query/src/main/scala/DataSource.scala b/examples/scala-parallel-recommendation/custom-query/src/main/scala/DataSource.scala
deleted file mode 100644
index 942afb1..0000000
--- a/examples/scala-parallel-recommendation/custom-query/src/main/scala/DataSource.scala
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.recommendation
-
-import org.apache.predictionio.controller.PDataSource
-import org.apache.predictionio.controller.EmptyEvaluationInfo
-import org.apache.predictionio.controller.EmptyActualResult
-import org.apache.predictionio.controller.Params
-import org.apache.predictionio.data.storage.{DataMap, Event, Storage}
-
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
-
-import grizzled.slf4j.Logger
-
-case class DataSourceParams(appId: Int) extends Params
-
-case class Item(creationYear: Option[Int])
-object Item {
- object Fields {
- val CreationYear = "creationYear"
- }
-}
-
-class DataSource(val dsp: DataSourceParams)
- extends PDataSource[TrainingData,
- EmptyEvaluationInfo, Query, EmptyActualResult] {
-
- @transient lazy val logger = Logger[this.type]
- private lazy val EntityType = "movie"
-
- override
- def readTraining(sc: SparkContext): TrainingData = {
- val eventsDb = Storage.getPEvents()
-
- // create a RDD of (entityID, Item)
- // HOWTO: collecting items(movies)
- val itemsRDD = eventsDb.aggregateProperties(
- appId = dsp.appId,
- entityType = "item"
- )(sc).flatMap { case (entityId, properties) ⇒
- ItemMarshaller.unmarshall(properties).map(entityId → _)
- }
-
- // get all user rate events
- val rateEventsRDD: RDD[Event] = eventsDb.find(
- appId = dsp.appId,
- entityType = Some("user"),
- eventNames = Some(List("rate")), // read "rate"
- // targetEntityType is optional field of an event.
- targetEntityType = Some(Some(EntityType)))(sc)
-
- // collect ratings
- val ratingsRDD = rateEventsRDD.flatMap { event ⇒
- try {
- (event.event match {
- case "rate" => event.properties.getOpt[Double]("rating")
- case _ ⇒ None
- }).map(Rating(event.entityId, event.targetEntityId.get, _))
- } catch { case e: Exception ⇒
- logger.error(s"Cannot convert ${event} to Rating. Exception: ${e}.")
- throw e
- }
- }.cache()
-
- new TrainingData(ratingsRDD, itemsRDD)
- }
-}
-
-object ItemMarshaller {
- // HOWTO: implemented unmarshaller to collect properties for filtering.
- def unmarshall(properties: DataMap): Option[Item] =
- Some(Item(properties.getOpt[Int](Item.Fields.CreationYear)))
-}
-
-case class Rating(user: String, item: String, rating: Double)
-
-class TrainingData(val ratings: RDD[Rating], val items: RDD[(String, Item)])
- extends Serializable {
-
- override def toString =
- s"ratings: [${ratings.count()}] (${ratings.take(2).toList}...)" +
- s"items: [${items.count()} (${items.take(2).toList}...)]"
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-query/src/main/scala/Engine.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/custom-query/src/main/scala/Engine.scala b/examples/scala-parallel-recommendation/custom-query/src/main/scala/Engine.scala
deleted file mode 100644
index 369128b..0000000
--- a/examples/scala-parallel-recommendation/custom-query/src/main/scala/Engine.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.recommendation
-
-import org.apache.predictionio.controller.IEngineFactory
-import org.apache.predictionio.controller.Engine
-
-case class Query(user: String, num: Int, creationYear: Option[Int] = None)
-
-case class PredictedResult(itemScores: Array[ItemScore])
-
-// HOWTO: added movie creation year to predicted result.
-case class ItemScore(item: String, score: Double, creationYear: Option[Int])
-
-object RecommendationEngine extends IEngineFactory {
- def apply() =
- new Engine(classOf[DataSource],
- classOf[Preparator],
- Map("als" → classOf[ALSAlgorithm]),
- classOf[Serving])
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-query/src/main/scala/Preparator.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/custom-query/src/main/scala/Preparator.scala b/examples/scala-parallel-recommendation/custom-query/src/main/scala/Preparator.scala
deleted file mode 100644
index ef40ac6..0000000
--- a/examples/scala-parallel-recommendation/custom-query/src/main/scala/Preparator.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.recommendation
-
-import org.apache.predictionio.controller.PPreparator
-
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
-
-class Preparator extends PPreparator[TrainingData, PreparedData] {
- def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData =
- new PreparedData(ratings = trainingData.ratings, items = trainingData.items)
-}
-
-// HOWTO: added items(movies) list to prepared data to have possiblity to sort
-// them in predict stage.
-class PreparedData(val ratings: RDD[Rating], val items: RDD[(String, Item)])
- extends Serializable
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-query/src/main/scala/Serving.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/custom-query/src/main/scala/Serving.scala b/examples/scala-parallel-recommendation/custom-query/src/main/scala/Serving.scala
deleted file mode 100644
index c14137d..0000000
--- a/examples/scala-parallel-recommendation/custom-query/src/main/scala/Serving.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.recommendation
-
-import org.apache.predictionio.controller.LServing
-
-class Serving extends LServing[Query, PredictedResult] {
-
- override def serve(query: Query,
- predictedResults: Seq[PredictedResult]): PredictedResult =
- predictedResults.headOption.map { result ⇒
- val preparedItems = result.itemScores
- .sortBy { case ItemScore(item, score, year) ⇒ year }(
- Ordering.Option[Int].reverse)
- new PredictedResult(preparedItems)
- }.getOrElse(new PredictedResult(Array.empty[ItemScore]))
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-serving/.gitignore
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/custom-serving/.gitignore b/examples/scala-parallel-recommendation/custom-serving/.gitignore
deleted file mode 100644
index df6668d..0000000
--- a/examples/scala-parallel-recommendation/custom-serving/.gitignore
+++ /dev/null
@@ -1,5 +0,0 @@
-data/sample_movielens_data.txt
-manifest.json
-target/
-/pio.sbt
-pio.log
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-serving/build.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/custom-serving/build.sbt b/examples/scala-parallel-recommendation/custom-serving/build.sbt
deleted file mode 100644
index 81cd3ec..0000000
--- a/examples/scala-parallel-recommendation/custom-serving/build.sbt
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import AssemblyKeys._
-
-assemblySettings
-
-name := "template-scala-parallel-recommendation"
-
-organization := "org.apache.predictionio"
-
-libraryDependencies ++= Seq(
- "org.apache.predictionio" %% "core" % pioVersion.value % "provided",
- "org.apache.spark" %% "spark-core" % "1.2.0" % "provided",
- "org.apache.spark" %% "spark-mllib" % "1.2.0" % "provided")
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-serving/data/sample_disabled_items.txt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/custom-serving/data/sample_disabled_items.txt b/examples/scala-parallel-recommendation/custom-serving/data/sample_disabled_items.txt
deleted file mode 100644
index e69de29..0000000
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-serving/engine.json
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/custom-serving/engine.json b/examples/scala-parallel-recommendation/custom-serving/engine.json
deleted file mode 100644
index 54ea3e8..0000000
--- a/examples/scala-parallel-recommendation/custom-serving/engine.json
+++ /dev/null
@@ -1,26 +0,0 @@
-{
- "id": "default",
- "description": "Default settings",
- "engineFactory": "org.template.recommendation.RecommendationEngine",
- "datasource": {
- "params": {
- "appId": 1
- }
- },
- "algorithms": [
- {
- "name": "als",
- "params": {
- "rank": 10,
- "numIterations": 20,
- "lambda": 0.01,
- "seed": 3
- }
- }
- ],
- "serving": {
- "params": {
- "filepath": "./data/sample_disabled_items.txt"
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-serving/project/assembly.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/custom-serving/project/assembly.sbt b/examples/scala-parallel-recommendation/custom-serving/project/assembly.sbt
deleted file mode 100644
index 54c3252..0000000
--- a/examples/scala-parallel-recommendation/custom-serving/project/assembly.sbt
+++ /dev/null
@@ -1 +0,0 @@
-addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-serving/project/pio-build.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/custom-serving/project/pio-build.sbt b/examples/scala-parallel-recommendation/custom-serving/project/pio-build.sbt
deleted file mode 100644
index 9aed0ee..0000000
--- a/examples/scala-parallel-recommendation/custom-serving/project/pio-build.sbt
+++ /dev/null
@@ -1 +0,0 @@
-addSbtPlugin("org.apache.predictionio" % "pio-build" % "0.9.0")
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-serving/src/main/scala/ALSAlgorithm.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/custom-serving/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-recommendation/custom-serving/src/main/scala/ALSAlgorithm.scala
deleted file mode 100644
index 22904af..0000000
--- a/examples/scala-parallel-recommendation/custom-serving/src/main/scala/ALSAlgorithm.scala
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.recommendation
-
-import org.apache.predictionio.controller.PAlgorithm
-import org.apache.predictionio.controller.Params
-import org.apache.predictionio.data.storage.BiMap
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.mllib.recommendation.ALS
-import org.apache.spark.mllib.recommendation.{Rating => MLlibRating}
-import org.apache.spark.mllib.recommendation.ALSModel
-
-import grizzled.slf4j.Logger
-
-case class ALSAlgorithmParams(
- rank: Int,
- numIterations: Int,
- lambda: Double,
- seed: Option[Long]) extends Params
-
-class ALSAlgorithm(val ap: ALSAlgorithmParams)
- extends PAlgorithm[PreparedData, ALSModel, Query, PredictedResult] {
-
- @transient lazy val logger = Logger[this.type]
-
- def train(sc: SparkContext, data: PreparedData): ALSModel = {
- // MLLib ALS cannot handle empty training data.
- require(!data.ratings.take(1).isEmpty,
- s"RDD[Rating] in PreparedData cannot be empty." +
- " Please check if DataSource generates TrainingData" +
- " and Preprator generates PreparedData correctly.")
- // Convert user and item String IDs to Int index for MLlib
- val userStringIntMap = BiMap.stringInt(data.ratings.map(_.user))
- val itemStringIntMap = BiMap.stringInt(data.ratings.map(_.item))
- val mllibRatings = data.ratings.map( r =>
- // MLlibRating requires integer index for user and item
- MLlibRating(userStringIntMap(r.user), itemStringIntMap(r.item), r.rating)
- )
-
- // seed for MLlib ALS
- val seed = ap.seed.getOrElse(System.nanoTime)
-
- // If you only have one type of implicit event (Eg. "view" event only),
- // replace ALS.train(...) with
- //val m = ALS.trainImplicit(
- //ratings = mllibRatings,
- //rank = ap.rank,
- //iterations = ap.numIterations,
- //lambda = ap.lambda,
- //blocks = -1,
- //alpha = 1.0,
- //seed = seed)
-
- val m = ALS.train(
- ratings = mllibRatings,
- rank = ap.rank,
- iterations = ap.numIterations,
- lambda = ap.lambda,
- blocks = -1,
- seed = seed)
-
- new ALSModel(
- rank = m.rank,
- userFeatures = m.userFeatures,
- productFeatures = m.productFeatures,
- userStringIntMap = userStringIntMap,
- itemStringIntMap = itemStringIntMap)
- }
-
- def predict(model: ALSModel, query: Query): PredictedResult = {
- // Convert String ID to Int index for Mllib
- model.userStringIntMap.get(query.user).map { userInt =>
- // create inverse view of itemStringIntMap
- val itemIntStringMap = model.itemStringIntMap.inverse
- // recommendProducts() returns Array[MLlibRating], which uses item Int
- // index. Convert it to String ID for returning PredictedResult
- val itemScores = model.recommendProducts(userInt, query.num)
- .map (r => ItemScore(itemIntStringMap(r.product), r.rating))
- new PredictedResult(itemScores)
- }.getOrElse{
- logger.info(s"No prediction for unknown user ${query.user}.")
- new PredictedResult(Array.empty)
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-serving/src/main/scala/ALSModel.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/custom-serving/src/main/scala/ALSModel.scala b/examples/scala-parallel-recommendation/custom-serving/src/main/scala/ALSModel.scala
deleted file mode 100644
index 4697732..0000000
--- a/examples/scala-parallel-recommendation/custom-serving/src/main/scala/ALSModel.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.mllib.recommendation
-// This must be the same package as Spark's MatrixFactorizationModel because
-// MatrixFactorizationModel's constructor is private and we are using
-// its constructor in order to save and load the model
-
-import org.template.recommendation.ALSAlgorithmParams
-
-import org.apache.predictionio.controller.IPersistentModel
-import org.apache.predictionio.controller.IPersistentModelLoader
-import org.apache.predictionio.data.storage.BiMap
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-class ALSModel(
- override val rank: Int,
- override val userFeatures: RDD[(Int, Array[Double])],
- override val productFeatures: RDD[(Int, Array[Double])],
- val userStringIntMap: BiMap[String, Int],
- val itemStringIntMap: BiMap[String, Int])
- extends MatrixFactorizationModel(rank, userFeatures, productFeatures)
- with IPersistentModel[ALSAlgorithmParams] {
-
- def save(id: String, params: ALSAlgorithmParams,
- sc: SparkContext): Boolean = {
-
- sc.parallelize(Seq(rank)).saveAsObjectFile(s"/tmp/${id}/rank")
- userFeatures.saveAsObjectFile(s"/tmp/${id}/userFeatures")
- productFeatures.saveAsObjectFile(s"/tmp/${id}/productFeatures")
- sc.parallelize(Seq(userStringIntMap))
- .saveAsObjectFile(s"/tmp/${id}/userStringIntMap")
- sc.parallelize(Seq(itemStringIntMap))
- .saveAsObjectFile(s"/tmp/${id}/itemStringIntMap")
- true
- }
-
- override def toString = {
- s"userFeatures: [${userFeatures.count()}]" +
- s"(${userFeatures.take(2).toList}...)" +
- s" productFeatures: [${productFeatures.count()}]" +
- s"(${productFeatures.take(2).toList}...)" +
- s" userStringIntMap: [${userStringIntMap.size}]" +
- s"(${userStringIntMap.take(2)}...)" +
- s" itemStringIntMap: [${itemStringIntMap.size}]" +
- s"(${itemStringIntMap.take(2)}...)"
- }
-}
-
-object ALSModel
- extends IPersistentModelLoader[ALSAlgorithmParams, ALSModel] {
- def apply(id: String, params: ALSAlgorithmParams,
- sc: Option[SparkContext]) = {
- new ALSModel(
- rank = sc.get.objectFile[Int](s"/tmp/${id}/rank").first,
- userFeatures = sc.get.objectFile(s"/tmp/${id}/userFeatures"),
- productFeatures = sc.get.objectFile(s"/tmp/${id}/productFeatures"),
- userStringIntMap = sc.get
- .objectFile[BiMap[String, Int]](s"/tmp/${id}/userStringIntMap").first,
- itemStringIntMap = sc.get
- .objectFile[BiMap[String, Int]](s"/tmp/${id}/itemStringIntMap").first)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-serving/src/main/scala/DataSource.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/custom-serving/src/main/scala/DataSource.scala b/examples/scala-parallel-recommendation/custom-serving/src/main/scala/DataSource.scala
deleted file mode 100644
index 1d3f0df..0000000
--- a/examples/scala-parallel-recommendation/custom-serving/src/main/scala/DataSource.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.recommendation
-
-import org.apache.predictionio.controller.PDataSource
-import org.apache.predictionio.controller.EmptyEvaluationInfo
-import org.apache.predictionio.controller.EmptyActualResult
-import org.apache.predictionio.controller.Params
-import org.apache.predictionio.data.storage.Event
-import org.apache.predictionio.data.storage.Storage
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-import grizzled.slf4j.Logger
-
-case class DataSourceParams(appId: Int) extends Params
-
-class DataSource(val dsp: DataSourceParams)
- extends PDataSource[TrainingData,
- EmptyEvaluationInfo, Query, EmptyActualResult] {
-
- @transient lazy val logger = Logger[this.type]
-
- override
- def readTraining(sc: SparkContext): TrainingData = {
- val eventsDb = Storage.getPEvents()
- val eventsRDD: RDD[Event] = eventsDb.find(
- appId = dsp.appId,
- entityType = Some("user"),
- eventNames = Some(List("rate", "buy")), // read "rate" and "buy" event
- // targetEntityType is optional field of an event.
- targetEntityType = Some(Some("item")))(sc)
-
- val ratingsRDD: RDD[Rating] = eventsRDD.map { event =>
- val rating = try {
- val ratingValue: Double = event.event match {
- case "rate" => event.properties.get[Double]("rating")
- case "buy" => 4.0 // map buy event to rating value of 4
- case _ => throw new Exception(s"Unexpected event ${event} is read.")
- }
- // entityId and targetEntityId is String
- Rating(event.entityId,
- event.targetEntityId.get,
- ratingValue)
- } catch {
- case e: Exception => {
- logger.error(s"Cannot convert ${event} to Rating. Exception: ${e}.")
- throw e
- }
- }
- rating
- }
- new TrainingData(ratingsRDD)
- }
-}
-
-case class Rating(
- user: String,
- item: String,
- rating: Double
-)
-
-class TrainingData(
- val ratings: RDD[Rating]
-) extends Serializable {
- override def toString = {
- s"ratings: [${ratings.count()}] (${ratings.take(2).toList}...)"
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-serving/src/main/scala/Engine.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/custom-serving/src/main/scala/Engine.scala b/examples/scala-parallel-recommendation/custom-serving/src/main/scala/Engine.scala
deleted file mode 100644
index 1446ca4..0000000
--- a/examples/scala-parallel-recommendation/custom-serving/src/main/scala/Engine.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.recommendation
-
-import org.apache.predictionio.controller.IEngineFactory
-import org.apache.predictionio.controller.Engine
-
-case class Query(
- user: String,
- num: Int
-)
-
-case class PredictedResult(
- itemScores: Array[ItemScore]
-)
-
-case class ItemScore(
- item: String,
- score: Double
-)
-
-object RecommendationEngine extends IEngineFactory {
- def apply() = {
- new Engine(
- classOf[DataSource],
- classOf[Preparator],
- Map("als" -> classOf[ALSAlgorithm]),
- classOf[Serving])
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-serving/src/main/scala/Preparator.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/custom-serving/src/main/scala/Preparator.scala b/examples/scala-parallel-recommendation/custom-serving/src/main/scala/Preparator.scala
deleted file mode 100644
index 4cd812c..0000000
--- a/examples/scala-parallel-recommendation/custom-serving/src/main/scala/Preparator.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.recommendation
-
-import org.apache.predictionio.controller.PPreparator
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-class Preparator
- extends PPreparator[TrainingData, PreparedData] {
-
- def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = {
- new PreparedData(ratings = trainingData.ratings)
- }
-}
-
-case class PreparedData(
- ratings: RDD[Rating]
-)
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-serving/src/main/scala/Serving.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/custom-serving/src/main/scala/Serving.scala b/examples/scala-parallel-recommendation/custom-serving/src/main/scala/Serving.scala
deleted file mode 100644
index 52d40e1..0000000
--- a/examples/scala-parallel-recommendation/custom-serving/src/main/scala/Serving.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.recommendation
-
-import org.apache.predictionio.controller.LServing
-
-import scala.io.Source
-
-import org.apache.predictionio.controller.Params // ADDED
-
-// ADDED ServingParams to specify the blacklisting file location.
-case class ServingParams(filepath: String) extends Params
-
-class Serving(val params: ServingParams)
- extends LServing[Query, PredictedResult] {
-
- override
- def serve(query: Query, predictedResults: Seq[PredictedResult])
- : PredictedResult = {
- val disabledProducts: Set[String] = Source
- .fromFile(params.filepath)
- .getLines
- .toSet
-
- val itemScores = predictedResults.head.itemScores
- PredictedResult(itemScores.filter(ps => !disabledProducts(ps.item)))
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-data-prep/.gitignore
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/customize-data-prep/.gitignore b/examples/scala-parallel-recommendation/customize-data-prep/.gitignore
new file mode 100644
index 0000000..3f3403a
--- /dev/null
+++ b/examples/scala-parallel-recommendation/customize-data-prep/.gitignore
@@ -0,0 +1,5 @@
+data/sample_movielens_data.txt
+manifest.json
+target/
+pio.log
+/pio.sbt
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-data-prep/build.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/customize-data-prep/build.sbt b/examples/scala-parallel-recommendation/customize-data-prep/build.sbt
new file mode 100644
index 0000000..3d25df1
--- /dev/null
+++ b/examples/scala-parallel-recommendation/customize-data-prep/build.sbt
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+name := "template-scala-parallel-recommendation"
+
+organization := "org.apache.predictionio"
+scalaVersion := "2.11.8"
+libraryDependencies ++= Seq(
+ "org.apache.predictionio" %% "apache-predictionio-core" % "0.11.0-incubating" % "provided",
+ "org.apache.spark" %% "spark-mllib" % "2.1.1" % "provided")
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-data-prep/data/import_eventserver.py
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/customize-data-prep/data/import_eventserver.py b/examples/scala-parallel-recommendation/customize-data-prep/data/import_eventserver.py
new file mode 100644
index 0000000..63694cf
--- /dev/null
+++ b/examples/scala-parallel-recommendation/customize-data-prep/data/import_eventserver.py
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Import sample data for recommendation engine
+"""
+
+import predictionio
+import argparse
+import random
+
+RATE_ACTIONS_DELIMITER = "::"
+SEED = 3
+
+def import_events(client, file):
+ f = open(file, 'r')
+ random.seed(SEED)
+ count = 0
+ print("Importing data...")
+ for line in f:
+ data = line.rstrip('\r\n').split(RATE_ACTIONS_DELIMITER)
+ # For demonstration purpose, randomly mix in some buy events
+ if (random.randint(0, 1) == 1):
+ client.create_event(
+ event="rate",
+ entity_type="user",
+ entity_id=data[0],
+ target_entity_type="item",
+ target_entity_id=data[1],
+ properties= { "rating" : float(data[2]) }
+ )
+ else:
+ client.create_event(
+ event="buy",
+ entity_type="user",
+ entity_id=data[0],
+ target_entity_type="item",
+ target_entity_id=data[1]
+ )
+ count += 1
+ f.close()
+ print("%s events are imported." % count)
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser(
+ description="Import sample data for recommendation engine")
+ parser.add_argument('--access_key', default='invald_access_key')
+ parser.add_argument('--url', default="http://localhost:7070")
+ parser.add_argument('--file', default="./data/sample_movielens_data.txt")
+
+ args = parser.parse_args()
+ print(args)
+
+ client = predictionio.EventClient(
+ access_key=args.access_key,
+ url=args.url,
+ threads=5,
+ qsize=500)
+ import_events(client, args.file)
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-data-prep/data/sample_not_train_data.txt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/customize-data-prep/data/sample_not_train_data.txt b/examples/scala-parallel-recommendation/customize-data-prep/data/sample_not_train_data.txt
new file mode 100644
index 0000000..077e96b
--- /dev/null
+++ b/examples/scala-parallel-recommendation/customize-data-prep/data/sample_not_train_data.txt
@@ -0,0 +1,8 @@
+3
+4
+10
+22
+34
+54
+65
+89
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-data-prep/data/send_query.py
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/customize-data-prep/data/send_query.py b/examples/scala-parallel-recommendation/customize-data-prep/data/send_query.py
new file mode 100644
index 0000000..f6ec9ab
--- /dev/null
+++ b/examples/scala-parallel-recommendation/customize-data-prep/data/send_query.py
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Send sample query to prediction engine
+"""
+
+import predictionio
+engine_client = predictionio.EngineClient(url="http://localhost:8000")
+print(engine_client.send_query({"user": "1", "num": 4}))
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-data-prep/engine.json
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/customize-data-prep/engine.json b/examples/scala-parallel-recommendation/customize-data-prep/engine.json
new file mode 100644
index 0000000..23fa1c9
--- /dev/null
+++ b/examples/scala-parallel-recommendation/customize-data-prep/engine.json
@@ -0,0 +1,26 @@
+{
+ "id": "default",
+ "description": "Default settings",
+ "engineFactory": "org.apache.predictionio.examples.recommendation.RecommendationEngine",
+ "datasource": {
+ "params" : {
+ "appName": "MyApp1"
+ }
+ },
+ "preparator": {
+ "params": {
+ "filepath": "./data/sample_not_train_data.txt"
+ }
+ },
+ "algorithms": [
+ {
+ "name": "als",
+ "params": {
+ "rank": 10,
+ "numIterations": 20,
+ "lambda": 0.01,
+ "seed": 3
+ }
+ }
+ ]
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-data-prep/project/assembly.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/customize-data-prep/project/assembly.sbt b/examples/scala-parallel-recommendation/customize-data-prep/project/assembly.sbt
new file mode 100644
index 0000000..92636bc
--- /dev/null
+++ b/examples/scala-parallel-recommendation/customize-data-prep/project/assembly.sbt
@@ -0,0 +1 @@
+addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.4")
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-data-prep/project/build.properties
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/customize-data-prep/project/build.properties b/examples/scala-parallel-recommendation/customize-data-prep/project/build.properties
new file mode 100644
index 0000000..64317fd
--- /dev/null
+++ b/examples/scala-parallel-recommendation/customize-data-prep/project/build.properties
@@ -0,0 +1 @@
+sbt.version=0.13.15
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/ALSAlgorithm.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/ALSAlgorithm.scala
new file mode 100644
index 0000000..b0f874d
--- /dev/null
+++ b/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/ALSAlgorithm.scala
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.recommendation
+
+import org.apache.predictionio.controller.PAlgorithm
+import org.apache.predictionio.controller.Params
+import org.apache.predictionio.data.storage.BiMap
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.recommendation.ALS
+import org.apache.spark.mllib.recommendation.{Rating => MLlibRating}
+import org.apache.spark.mllib.recommendation.ALSModel
+
+import grizzled.slf4j.Logger
+
+case class ALSAlgorithmParams(
+ rank: Int,
+ numIterations: Int,
+ lambda: Double,
+ seed: Option[Long]) extends Params
+
+class ALSAlgorithm(val ap: ALSAlgorithmParams)
+ extends PAlgorithm[PreparedData, ALSModel, Query, PredictedResult] {
+
+ @transient lazy val logger = Logger[this.type]
+
+ if (ap.numIterations > 30) {
+ logger.warn(
+ s"ALSAlgorithmParams.numIterations > 30, current: ${ap.numIterations}. " +
+ s"There is a chance of running to StackOverflowException." +
+ s"To remedy it, set lower numIterations or checkpoint parameters.")
+ }
+
+ def train(sc: SparkContext, data: PreparedData): ALSModel = {
+ // MLLib ALS cannot handle empty training data.
+ require(!data.ratings.take(1).isEmpty,
+ s"RDD[Rating] in PreparedData cannot be empty." +
+ " Please check if DataSource generates TrainingData" +
+ " and Preparator generates PreparedData correctly.")
+ // Convert user and item String IDs to Int index for MLlib
+
+ val userStringIntMap = BiMap.stringInt(data.ratings.map(_.user))
+ val itemStringIntMap = BiMap.stringInt(data.ratings.map(_.item))
+ val mllibRatings = data.ratings.map( r =>
+ // MLlibRating requires integer index for user and item
+ MLlibRating(userStringIntMap(r.user), itemStringIntMap(r.item), r.rating)
+ )
+
+ // seed for MLlib ALS
+ val seed = ap.seed.getOrElse(System.nanoTime)
+
+ // Set checkpoint directory
+ // sc.setCheckpointDir("checkpoint")
+
+ // If you only have one type of implicit event (Eg. "view" event only),
+ // set implicitPrefs to true
+ val implicitPrefs = false
+ val als = new ALS()
+ als.setUserBlocks(-1)
+ als.setProductBlocks(-1)
+ als.setRank(ap.rank)
+ als.setIterations(ap.numIterations)
+ als.setLambda(ap.lambda)
+ als.setImplicitPrefs(implicitPrefs)
+ als.setAlpha(1.0)
+ als.setSeed(seed)
+ als.setCheckpointInterval(10)
+ val m = als.run(mllibRatings)
+
+ new ALSModel(
+ rank = m.rank,
+ userFeatures = m.userFeatures,
+ productFeatures = m.productFeatures,
+ userStringIntMap = userStringIntMap,
+ itemStringIntMap = itemStringIntMap)
+ }
+
+ def predict(model: ALSModel, query: Query): PredictedResult = {
+ // Convert String ID to Int index for Mllib
+ model.userStringIntMap.get(query.user).map { userInt =>
+ // create inverse view of itemStringIntMap
+ val itemIntStringMap = model.itemStringIntMap.inverse
+ // recommendProducts() returns Array[MLlibRating], which uses item Int
+ // index. Convert it to String ID for returning PredictedResult
+ val itemScores = model.recommendProducts(userInt, query.num)
+ .map (r => ItemScore(itemIntStringMap(r.product), r.rating))
+ new PredictedResult(itemScores)
+ }.getOrElse{
+ logger.info(s"No prediction for unknown user ${query.user}.")
+ new PredictedResult(Array.empty)
+ }
+ }
+
+ // This function is used by the evaluation module, where a batch of queries is sent to this engine
+ // for evaluation purpose.
+ override def batchPredict(model: ALSModel, queries: RDD[(Long, Query)]): RDD[(Long, PredictedResult)] = {
+ val userIxQueries: RDD[(Int, (Long, Query))] = queries
+ .map { case (ix, query) => {
+ // If user not found, then the index is -1
+ val userIx = model.userStringIntMap.get(query.user).getOrElse(-1)
+ (userIx, (ix, query))
+ }}
+
+ // Cross product of all valid users from the queries and products in the model.
+ val usersProducts: RDD[(Int, Int)] = userIxQueries
+ .keys
+ .filter(_ != -1)
+ .cartesian(model.productFeatures.map(_._1))
+
+ // Call mllib ALS's predict function.
+ val ratings: RDD[MLlibRating] = model.predict(usersProducts)
+
+ // The following code construct predicted results from mllib's ratings.
+ // Not optimal implementation. Instead of groupBy, should use combineByKey with a PriorityQueue
+ val userRatings: RDD[(Int, Iterable[MLlibRating])] = ratings.groupBy(_.user)
+
+ userIxQueries.leftOuterJoin(userRatings)
+ .map {
+ // When there are ratings
+ case (userIx, ((ix, query), Some(ratings))) => {
+ val topItemScores: Array[ItemScore] = ratings
+ .toArray
+ .sortBy(_.rating)(Ordering.Double.reverse) // note: from large to small ordering
+ .take(query.num)
+ .map { rating => ItemScore(
+ model.itemStringIntMap.inverse(rating.product),
+ rating.rating) }
+
+ (ix, PredictedResult(itemScores = topItemScores))
+ }
+ // When user doesn't exist in training data
+ case (userIx, ((ix, query), None)) => {
+ require(userIx == -1)
+ (ix, PredictedResult(itemScores = Array.empty))
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/ALSModel.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/ALSModel.scala b/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/ALSModel.scala
new file mode 100644
index 0000000..898858d
--- /dev/null
+++ b/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/ALSModel.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.recommendation
+// This must be the same package as Spark's MatrixFactorizationModel because
+// MatrixFactorizationModel's constructor is private and we are using
+// its constructor in order to save and load the model
+
+import org.apache.predictionio.examples.recommendation.ALSAlgorithmParams
+
+import org.apache.predictionio.controller.PersistentModel
+import org.apache.predictionio.controller.PersistentModelLoader
+import org.apache.predictionio.data.storage.BiMap
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+class ALSModel(
+ override val rank: Int,
+ override val userFeatures: RDD[(Int, Array[Double])],
+ override val productFeatures: RDD[(Int, Array[Double])],
+ val userStringIntMap: BiMap[String, Int],
+ val itemStringIntMap: BiMap[String, Int])
+ extends MatrixFactorizationModel(rank, userFeatures, productFeatures)
+ with PersistentModel[ALSAlgorithmParams] {
+
+ def save(id: String, params: ALSAlgorithmParams,
+ sc: SparkContext): Boolean = {
+
+ sc.parallelize(Seq(rank)).saveAsObjectFile(s"/tmp/${id}/rank")
+ userFeatures.saveAsObjectFile(s"/tmp/${id}/userFeatures")
+ productFeatures.saveAsObjectFile(s"/tmp/${id}/productFeatures")
+ sc.parallelize(Seq(userStringIntMap))
+ .saveAsObjectFile(s"/tmp/${id}/userStringIntMap")
+ sc.parallelize(Seq(itemStringIntMap))
+ .saveAsObjectFile(s"/tmp/${id}/itemStringIntMap")
+ true
+ }
+
+ override def toString = {
+ s"userFeatures: [${userFeatures.count()}]" +
+ s"(${userFeatures.take(2).toList}...)" +
+ s" productFeatures: [${productFeatures.count()}]" +
+ s"(${productFeatures.take(2).toList}...)" +
+ s" userStringIntMap: [${userStringIntMap.size}]" +
+ s"(${userStringIntMap.take(2)}...)" +
+ s" itemStringIntMap: [${itemStringIntMap.size}]" +
+ s"(${itemStringIntMap.take(2)}...)"
+ }
+}
+
+object ALSModel
+ extends PersistentModelLoader[ALSAlgorithmParams, ALSModel] {
+ def apply(id: String, params: ALSAlgorithmParams,
+ sc: Option[SparkContext]) = {
+ new ALSModel(
+ rank = sc.get.objectFile[Int](s"/tmp/${id}/rank").first,
+ userFeatures = sc.get.objectFile(s"/tmp/${id}/userFeatures"),
+ productFeatures = sc.get.objectFile(s"/tmp/${id}/productFeatures"),
+ userStringIntMap = sc.get
+ .objectFile[BiMap[String, Int]](s"/tmp/${id}/userStringIntMap").first,
+ itemStringIntMap = sc.get
+ .objectFile[BiMap[String, Int]](s"/tmp/${id}/itemStringIntMap").first)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/DataSource.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/DataSource.scala b/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/DataSource.scala
new file mode 100644
index 0000000..d606ad3
--- /dev/null
+++ b/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/DataSource.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.recommendation
+
+import org.apache.predictionio.controller.PDataSource
+import org.apache.predictionio.controller.EmptyEvaluationInfo
+import org.apache.predictionio.controller.EmptyActualResult
+import org.apache.predictionio.controller.Params
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.store.PEventStore
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+import grizzled.slf4j.Logger
+
+case class DataSourceEvalParams(kFold: Int, queryNum: Int)
+
+case class DataSourceParams(
+ appName: String,
+ evalParams: Option[DataSourceEvalParams]) extends Params
+
+class DataSource(val dsp: DataSourceParams)
+ extends PDataSource[TrainingData,
+ EmptyEvaluationInfo, Query, ActualResult] {
+
+ @transient lazy val logger = Logger[this.type]
+
+ def getRatings(sc: SparkContext): RDD[Rating] = {
+
+ val eventsRDD: RDD[Event] = PEventStore.find(
+ appName = dsp.appName,
+ entityType = Some("user"),
+ eventNames = Some(List("rate", "buy")), // read "rate" and "buy" event
+ // targetEntityType is optional field of an event.
+ targetEntityType = Some(Some("item")))(sc)
+
+ val ratingsRDD: RDD[Rating] = eventsRDD.map { event =>
+ val rating = try {
+ val ratingValue: Double = event.event match {
+ case "rate" => event.properties.get[Double]("rating")
+ case "buy" => 4.0 // map buy event to rating value of 4
+ case _ => throw new Exception(s"Unexpected event ${event} is read.")
+ }
+ // entityId and targetEntityId is String
+ Rating(event.entityId,
+ event.targetEntityId.get,
+ ratingValue)
+ } catch {
+ case e: Exception => {
+ logger.error(s"Cannot convert ${event} to Rating. Exception: ${e}.")
+ throw e
+ }
+ }
+ rating
+ }.cache()
+
+ ratingsRDD
+ }
+
+ override
+ def readTraining(sc: SparkContext): TrainingData = {
+ new TrainingData(getRatings(sc))
+ }
+
+ override
+ def readEval(sc: SparkContext)
+ : Seq[(TrainingData, EmptyEvaluationInfo, RDD[(Query, ActualResult)])] = {
+ require(!dsp.evalParams.isEmpty, "Must specify evalParams")
+ val evalParams = dsp.evalParams.get
+
+ val kFold = evalParams.kFold
+ val ratings: RDD[(Rating, Long)] = getRatings(sc).zipWithUniqueId
+ ratings.cache
+
+ (0 until kFold).map { idx => {
+ val trainingRatings = ratings.filter(_._2 % kFold != idx).map(_._1)
+ val testingRatings = ratings.filter(_._2 % kFold == idx).map(_._1)
+
+ val testingUsers: RDD[(String, Iterable[Rating])] = testingRatings.groupBy(_.user)
+
+ (new TrainingData(trainingRatings),
+ new EmptyEvaluationInfo(),
+ testingUsers.map {
+ case (user, ratings) => (Query(user, evalParams.queryNum), ActualResult(ratings.toArray))
+ }
+ )
+ }}
+ }
+}
+
+case class Rating(
+ user: String,
+ item: String,
+ rating: Double
+)
+
+class TrainingData(
+ val ratings: RDD[Rating]
+) extends Serializable {
+ override def toString = {
+ s"ratings: [${ratings.count()}] (${ratings.take(2).toList}...)"
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/Engine.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/Engine.scala b/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/Engine.scala
new file mode 100644
index 0000000..b2a668b
--- /dev/null
+++ b/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/Engine.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.recommendation
+
+import org.apache.predictionio.controller.EngineFactory
+import org.apache.predictionio.controller.Engine
+
+case class Query(
+ user: String,
+ num: Int
+) extends Serializable
+
+case class PredictedResult(
+ itemScores: Array[ItemScore]
+) extends Serializable
+
+case class ActualResult(
+ ratings: Array[Rating]
+) extends Serializable
+
+case class ItemScore(
+ item: String,
+ score: Double
+) extends Serializable
+
+object RecommendationEngine extends EngineFactory {
+ def apply() = {
+ new Engine(
+ classOf[DataSource],
+ classOf[Preparator],
+ Map("als" -> classOf[ALSAlgorithm]),
+ classOf[Serving])
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/Evaluation.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/Evaluation.scala b/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/Evaluation.scala
new file mode 100644
index 0000000..a665496
--- /dev/null
+++ b/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/Evaluation.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.recommendation
+
+import org.apache.predictionio.controller.Evaluation
+import org.apache.predictionio.controller.OptionAverageMetric
+import org.apache.predictionio.controller.AverageMetric
+import org.apache.predictionio.controller.EmptyEvaluationInfo
+import org.apache.predictionio.controller.EngineParamsGenerator
+import org.apache.predictionio.controller.EngineParams
+import org.apache.predictionio.controller.MetricEvaluator
+
+// Usage:
+// $ pio eval org.example.recommendation.RecommendationEvaluation \
+// org.example.recommendation.EngineParamsList
+
+case class PrecisionAtK(k: Int, ratingThreshold: Double = 2.0)
+ extends OptionAverageMetric[EmptyEvaluationInfo, Query, PredictedResult, ActualResult] {
+ require(k > 0, "k must be greater than 0")
+
+ override def header = s"Precision@K (k=$k, threshold=$ratingThreshold)"
+
+ def calculate(q: Query, p: PredictedResult, a: ActualResult): Option[Double] = {
+ val positives: Set[String] = a.ratings.filter(_.rating >= ratingThreshold).map(_.item).toSet
+
+ // If there is no positive results, Precision is undefined. We don't consider this case in the
+ // metrics, hence we return None.
+ if (positives.size == 0) {
+ None
+ } else {
+ val tpCount: Int = p.itemScores.take(k).filter(is => positives(is.item)).size
+ Some(tpCount.toDouble / math.min(k, positives.size))
+ }
+ }
+}
+
+case class PositiveCount(ratingThreshold: Double = 2.0)
+ extends AverageMetric[EmptyEvaluationInfo, Query, PredictedResult, ActualResult] {
+ override def header = s"PositiveCount (threshold=$ratingThreshold)"
+
+ def calculate(q: Query, p: PredictedResult, a: ActualResult): Double = {
+ a.ratings.filter(_.rating >= ratingThreshold).size
+ }
+}
+
+object RecommendationEvaluation extends Evaluation {
+ engineEvaluator = (
+ RecommendationEngine(),
+ MetricEvaluator(
+ metric = PrecisionAtK(k = 10, ratingThreshold = 4.0),
+ otherMetrics = Seq(
+ PositiveCount(ratingThreshold = 4.0),
+ PrecisionAtK(k = 10, ratingThreshold = 2.0),
+ PositiveCount(ratingThreshold = 2.0),
+ PrecisionAtK(k = 10, ratingThreshold = 1.0),
+ PositiveCount(ratingThreshold = 1.0)
+ )))
+}
+
+
+object ComprehensiveRecommendationEvaluation extends Evaluation {
+ val ratingThresholds = Seq(0.0, 2.0, 4.0)
+ val ks = Seq(1, 3, 10)
+
+ engineEvaluator = (
+ RecommendationEngine(),
+ MetricEvaluator(
+ metric = PrecisionAtK(k = 3, ratingThreshold = 2.0),
+ otherMetrics = (
+ (for (r <- ratingThresholds) yield PositiveCount(ratingThreshold = r)) ++
+ (for (r <- ratingThresholds; k <- ks) yield PrecisionAtK(k = k, ratingThreshold = r))
+ )))
+}
+
+
+trait BaseEngineParamsList extends EngineParamsGenerator {
+ protected val baseEP = EngineParams(
+ dataSourceParams = DataSourceParams(
+ appName = "MyApp1",
+ evalParams = Some(DataSourceEvalParams(kFold = 5, queryNum = 10))))
+}
+
+object EngineParamsList extends BaseEngineParamsList {
+ engineParamsList = for(
+ rank <- Seq(5, 10, 20);
+ numIterations <- Seq(1, 5, 10))
+ yield baseEP.copy(
+ algorithmParamsList = Seq(
+ ("als", ALSAlgorithmParams(rank, numIterations, 0.01, Some(3)))))
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/Preparator.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/Preparator.scala b/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/Preparator.scala
new file mode 100644
index 0000000..cf792af
--- /dev/null
+++ b/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/Preparator.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.recommendation
+
+import org.apache.predictionio.controller.PPreparator
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+import scala.io.Source // ADDED
+import org.apache.predictionio.controller.Params // ADDED
+
+// ADDED CustomPreparatorParams case class
+case class CustomPreparatorParams(
+ filepath: String
+) extends Params
+
+class Preparator(pp: CustomPreparatorParams) // ADDED CustomPreparatorParams
+ extends PPreparator[TrainingData, PreparedData] {
+
+ def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = {
+ val noTrainItems = Source.fromFile(pp.filepath).getLines.toSet // CHANGED
+ val ratings = trainingData.ratings.filter( r =>
+ !noTrainItems.contains(r.item)
+ )
+ new PreparedData(ratings)
+ }
+}
+
+class PreparedData(
+ val ratings: RDD[Rating]
+) extends Serializable
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/Serving.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/Serving.scala b/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/Serving.scala
new file mode 100644
index 0000000..c478455
--- /dev/null
+++ b/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/Serving.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.recommendation
+
+import org.apache.predictionio.controller.LServing
+
+class Serving
+ extends LServing[Query, PredictedResult] {
+
+ override
+ def serve(query: Query,
+ predictedResults: Seq[PredictedResult]): PredictedResult = {
+ predictedResults.head
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-data-prep/template.json
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/customize-data-prep/template.json b/examples/scala-parallel-recommendation/customize-data-prep/template.json
new file mode 100644
index 0000000..d076ec5
--- /dev/null
+++ b/examples/scala-parallel-recommendation/customize-data-prep/template.json
@@ -0,0 +1 @@
+{"pio": {"version": { "min": "0.10.0-incubating" }}}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-serving/.gitignore
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/customize-serving/.gitignore b/examples/scala-parallel-recommendation/customize-serving/.gitignore
new file mode 100644
index 0000000..3f3403a
--- /dev/null
+++ b/examples/scala-parallel-recommendation/customize-serving/.gitignore
@@ -0,0 +1,5 @@
+data/sample_movielens_data.txt
+manifest.json
+target/
+pio.log
+/pio.sbt
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-serving/build.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/customize-serving/build.sbt b/examples/scala-parallel-recommendation/customize-serving/build.sbt
new file mode 100644
index 0000000..3d25df1
--- /dev/null
+++ b/examples/scala-parallel-recommendation/customize-serving/build.sbt
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+name := "template-scala-parallel-recommendation"
+
+organization := "org.apache.predictionio"
+scalaVersion := "2.11.8"
+libraryDependencies ++= Seq(
+ "org.apache.predictionio" %% "apache-predictionio-core" % "0.11.0-incubating" % "provided",
+ "org.apache.spark" %% "spark-mllib" % "2.1.1" % "provided")
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-serving/data/import_eventserver.py
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/customize-serving/data/import_eventserver.py b/examples/scala-parallel-recommendation/customize-serving/data/import_eventserver.py
new file mode 100644
index 0000000..63694cf
--- /dev/null
+++ b/examples/scala-parallel-recommendation/customize-serving/data/import_eventserver.py
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Import sample data for recommendation engine
+"""
+
+import predictionio
+import argparse
+import random
+
+RATE_ACTIONS_DELIMITER = "::"
+SEED = 3
+
+def import_events(client, file):
+ f = open(file, 'r')
+ random.seed(SEED)
+ count = 0
+ print("Importing data...")
+ for line in f:
+ data = line.rstrip('\r\n').split(RATE_ACTIONS_DELIMITER)
+ # For demonstration purpose, randomly mix in some buy events
+ if (random.randint(0, 1) == 1):
+ client.create_event(
+ event="rate",
+ entity_type="user",
+ entity_id=data[0],
+ target_entity_type="item",
+ target_entity_id=data[1],
+ properties= { "rating" : float(data[2]) }
+ )
+ else:
+ client.create_event(
+ event="buy",
+ entity_type="user",
+ entity_id=data[0],
+ target_entity_type="item",
+ target_entity_id=data[1]
+ )
+ count += 1
+ f.close()
+ print("%s events are imported." % count)
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser(
+ description="Import sample data for recommendation engine")
+ parser.add_argument('--access_key', default='invald_access_key')
+ parser.add_argument('--url', default="http://localhost:7070")
+ parser.add_argument('--file', default="./data/sample_movielens_data.txt")
+
+ args = parser.parse_args()
+ print(args)
+
+ client = predictionio.EventClient(
+ access_key=args.access_key,
+ url=args.url,
+ threads=5,
+ qsize=500)
+ import_events(client, args.file)
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-serving/data/sample_disabled_items.txt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/customize-serving/data/sample_disabled_items.txt b/examples/scala-parallel-recommendation/customize-serving/data/sample_disabled_items.txt
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-serving/data/send_query.py
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/customize-serving/data/send_query.py b/examples/scala-parallel-recommendation/customize-serving/data/send_query.py
new file mode 100644
index 0000000..f6ec9ab
--- /dev/null
+++ b/examples/scala-parallel-recommendation/customize-serving/data/send_query.py
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Send sample query to prediction engine
+"""
+
+import predictionio
+engine_client = predictionio.EngineClient(url="http://localhost:8000")
+print(engine_client.send_query({"user": "1", "num": 4}))
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-serving/engine.json
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/customize-serving/engine.json b/examples/scala-parallel-recommendation/customize-serving/engine.json
new file mode 100644
index 0000000..75400bc
--- /dev/null
+++ b/examples/scala-parallel-recommendation/customize-serving/engine.json
@@ -0,0 +1,26 @@
+{
+ "id": "default",
+ "description": "Default settings",
+ "engineFactory": "org.apache.predictionio.examples.recommendation.RecommendationEngine",
+ "datasource": {
+ "params" : {
+ "appName": "MyApp1"
+ }
+ },
+ "algorithms": [
+ {
+ "name": "als",
+ "params": {
+ "rank": 10,
+ "numIterations": 20,
+ "lambda": 0.01,
+ "seed": 3
+ }
+ }
+ ],
+ "serving": {
+ "params": {
+ "filepath": "./data/sample_disabled_items.txt"
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-serving/project/assembly.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/customize-serving/project/assembly.sbt b/examples/scala-parallel-recommendation/customize-serving/project/assembly.sbt
new file mode 100644
index 0000000..92636bc
--- /dev/null
+++ b/examples/scala-parallel-recommendation/customize-serving/project/assembly.sbt
@@ -0,0 +1 @@
+addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.4")
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-serving/project/build.properties
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/customize-serving/project/build.properties b/examples/scala-parallel-recommendation/customize-serving/project/build.properties
new file mode 100644
index 0000000..64317fd
--- /dev/null
+++ b/examples/scala-parallel-recommendation/customize-serving/project/build.properties
@@ -0,0 +1 @@
+sbt.version=0.13.15