You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by sh...@apache.org on 2017/07/10 04:10:09 UTC
[06/11] incubator-predictionio git commit: [PIO-97] Fixes examples of
the official templates for v0.11.0-incubating.
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-serving/src/main/scala/ALSAlgorithm.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/customize-serving/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-recommendation/customize-serving/src/main/scala/ALSAlgorithm.scala
new file mode 100644
index 0000000..b0f874d
--- /dev/null
+++ b/examples/scala-parallel-recommendation/customize-serving/src/main/scala/ALSAlgorithm.scala
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.recommendation
+
+import org.apache.predictionio.controller.PAlgorithm
+import org.apache.predictionio.controller.Params
+import org.apache.predictionio.data.storage.BiMap
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.recommendation.ALS
+import org.apache.spark.mllib.recommendation.{Rating => MLlibRating}
+import org.apache.spark.mllib.recommendation.ALSModel
+
+import grizzled.slf4j.Logger
+
+case class ALSAlgorithmParams(
+ rank: Int,
+ numIterations: Int,
+ lambda: Double,
+ seed: Option[Long]) extends Params
+
+class ALSAlgorithm(val ap: ALSAlgorithmParams)
+ extends PAlgorithm[PreparedData, ALSModel, Query, PredictedResult] {
+
+ @transient lazy val logger = Logger[this.type]
+
+ if (ap.numIterations > 30) {
+ logger.warn(
+ s"ALSAlgorithmParams.numIterations > 30, current: ${ap.numIterations}. " +
+ s"There is a chance of running to StackOverflowException." +
+ s"To remedy it, set lower numIterations or checkpoint parameters.")
+ }
+
+ def train(sc: SparkContext, data: PreparedData): ALSModel = {
+ // MLLib ALS cannot handle empty training data.
+ require(!data.ratings.take(1).isEmpty,
+ s"RDD[Rating] in PreparedData cannot be empty." +
+ " Please check if DataSource generates TrainingData" +
+ " and Preparator generates PreparedData correctly.")
+ // Convert user and item String IDs to Int index for MLlib
+
+ val userStringIntMap = BiMap.stringInt(data.ratings.map(_.user))
+ val itemStringIntMap = BiMap.stringInt(data.ratings.map(_.item))
+ val mllibRatings = data.ratings.map( r =>
+ // MLlibRating requires integer index for user and item
+ MLlibRating(userStringIntMap(r.user), itemStringIntMap(r.item), r.rating)
+ )
+
+ // seed for MLlib ALS
+ val seed = ap.seed.getOrElse(System.nanoTime)
+
+ // Set checkpoint directory
+ // sc.setCheckpointDir("checkpoint")
+
+ // If you only have one type of implicit event (Eg. "view" event only),
+ // set implicitPrefs to true
+ val implicitPrefs = false
+ val als = new ALS()
+ als.setUserBlocks(-1)
+ als.setProductBlocks(-1)
+ als.setRank(ap.rank)
+ als.setIterations(ap.numIterations)
+ als.setLambda(ap.lambda)
+ als.setImplicitPrefs(implicitPrefs)
+ als.setAlpha(1.0)
+ als.setSeed(seed)
+ als.setCheckpointInterval(10)
+ val m = als.run(mllibRatings)
+
+ new ALSModel(
+ rank = m.rank,
+ userFeatures = m.userFeatures,
+ productFeatures = m.productFeatures,
+ userStringIntMap = userStringIntMap,
+ itemStringIntMap = itemStringIntMap)
+ }
+
+ def predict(model: ALSModel, query: Query): PredictedResult = {
+ // Convert String ID to Int index for Mllib
+ model.userStringIntMap.get(query.user).map { userInt =>
+ // create inverse view of itemStringIntMap
+ val itemIntStringMap = model.itemStringIntMap.inverse
+ // recommendProducts() returns Array[MLlibRating], which uses item Int
+ // index. Convert it to String ID for returning PredictedResult
+ val itemScores = model.recommendProducts(userInt, query.num)
+ .map (r => ItemScore(itemIntStringMap(r.product), r.rating))
+ new PredictedResult(itemScores)
+ }.getOrElse{
+ logger.info(s"No prediction for unknown user ${query.user}.")
+ new PredictedResult(Array.empty)
+ }
+ }
+
+ // This function is used by the evaluation module, where a batch of queries is sent to this engine
+ // for evaluation purpose.
+ override def batchPredict(model: ALSModel, queries: RDD[(Long, Query)]): RDD[(Long, PredictedResult)] = {
+ val userIxQueries: RDD[(Int, (Long, Query))] = queries
+ .map { case (ix, query) => {
+ // If user not found, then the index is -1
+ val userIx = model.userStringIntMap.get(query.user).getOrElse(-1)
+ (userIx, (ix, query))
+ }}
+
+ // Cross product of all valid users from the queries and products in the model.
+ val usersProducts: RDD[(Int, Int)] = userIxQueries
+ .keys
+ .filter(_ != -1)
+ .cartesian(model.productFeatures.map(_._1))
+
+ // Call mllib ALS's predict function.
+ val ratings: RDD[MLlibRating] = model.predict(usersProducts)
+
+ // The following code construct predicted results from mllib's ratings.
+ // Not optimal implementation. Instead of groupBy, should use combineByKey with a PriorityQueue
+ val userRatings: RDD[(Int, Iterable[MLlibRating])] = ratings.groupBy(_.user)
+
+ userIxQueries.leftOuterJoin(userRatings)
+ .map {
+ // When there are ratings
+ case (userIx, ((ix, query), Some(ratings))) => {
+ val topItemScores: Array[ItemScore] = ratings
+ .toArray
+ .sortBy(_.rating)(Ordering.Double.reverse) // note: from large to small ordering
+ .take(query.num)
+ .map { rating => ItemScore(
+ model.itemStringIntMap.inverse(rating.product),
+ rating.rating) }
+
+ (ix, PredictedResult(itemScores = topItemScores))
+ }
+ // When user doesn't exist in training data
+ case (userIx, ((ix, query), None)) => {
+ require(userIx == -1)
+ (ix, PredictedResult(itemScores = Array.empty))
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-serving/src/main/scala/ALSModel.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/customize-serving/src/main/scala/ALSModel.scala b/examples/scala-parallel-recommendation/customize-serving/src/main/scala/ALSModel.scala
new file mode 100644
index 0000000..898858d
--- /dev/null
+++ b/examples/scala-parallel-recommendation/customize-serving/src/main/scala/ALSModel.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.recommendation
+// This must be the same package as Spark's MatrixFactorizationModel because
+// MatrixFactorizationModel's constructor is private and we are using
+// its constructor in order to save and load the model
+
+import org.apache.predictionio.examples.recommendation.ALSAlgorithmParams
+
+import org.apache.predictionio.controller.PersistentModel
+import org.apache.predictionio.controller.PersistentModelLoader
+import org.apache.predictionio.data.storage.BiMap
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+class ALSModel(
+ override val rank: Int,
+ override val userFeatures: RDD[(Int, Array[Double])],
+ override val productFeatures: RDD[(Int, Array[Double])],
+ val userStringIntMap: BiMap[String, Int],
+ val itemStringIntMap: BiMap[String, Int])
+ extends MatrixFactorizationModel(rank, userFeatures, productFeatures)
+ with PersistentModel[ALSAlgorithmParams] {
+
+ def save(id: String, params: ALSAlgorithmParams,
+ sc: SparkContext): Boolean = {
+
+ sc.parallelize(Seq(rank)).saveAsObjectFile(s"/tmp/${id}/rank")
+ userFeatures.saveAsObjectFile(s"/tmp/${id}/userFeatures")
+ productFeatures.saveAsObjectFile(s"/tmp/${id}/productFeatures")
+ sc.parallelize(Seq(userStringIntMap))
+ .saveAsObjectFile(s"/tmp/${id}/userStringIntMap")
+ sc.parallelize(Seq(itemStringIntMap))
+ .saveAsObjectFile(s"/tmp/${id}/itemStringIntMap")
+ true
+ }
+
+ override def toString = {
+ s"userFeatures: [${userFeatures.count()}]" +
+ s"(${userFeatures.take(2).toList}...)" +
+ s" productFeatures: [${productFeatures.count()}]" +
+ s"(${productFeatures.take(2).toList}...)" +
+ s" userStringIntMap: [${userStringIntMap.size}]" +
+ s"(${userStringIntMap.take(2)}...)" +
+ s" itemStringIntMap: [${itemStringIntMap.size}]" +
+ s"(${itemStringIntMap.take(2)}...)"
+ }
+}
+
+object ALSModel
+ extends PersistentModelLoader[ALSAlgorithmParams, ALSModel] {
+ def apply(id: String, params: ALSAlgorithmParams,
+ sc: Option[SparkContext]) = {
+ new ALSModel(
+ rank = sc.get.objectFile[Int](s"/tmp/${id}/rank").first,
+ userFeatures = sc.get.objectFile(s"/tmp/${id}/userFeatures"),
+ productFeatures = sc.get.objectFile(s"/tmp/${id}/productFeatures"),
+ userStringIntMap = sc.get
+ .objectFile[BiMap[String, Int]](s"/tmp/${id}/userStringIntMap").first,
+ itemStringIntMap = sc.get
+ .objectFile[BiMap[String, Int]](s"/tmp/${id}/itemStringIntMap").first)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-serving/src/main/scala/DataSource.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/customize-serving/src/main/scala/DataSource.scala b/examples/scala-parallel-recommendation/customize-serving/src/main/scala/DataSource.scala
new file mode 100644
index 0000000..d606ad3
--- /dev/null
+++ b/examples/scala-parallel-recommendation/customize-serving/src/main/scala/DataSource.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.recommendation
+
+import org.apache.predictionio.controller.PDataSource
+import org.apache.predictionio.controller.EmptyEvaluationInfo
+import org.apache.predictionio.controller.EmptyActualResult
+import org.apache.predictionio.controller.Params
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.store.PEventStore
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+import grizzled.slf4j.Logger
+
+case class DataSourceEvalParams(kFold: Int, queryNum: Int)
+
+case class DataSourceParams(
+ appName: String,
+ evalParams: Option[DataSourceEvalParams]) extends Params
+
+class DataSource(val dsp: DataSourceParams)
+ extends PDataSource[TrainingData,
+ EmptyEvaluationInfo, Query, ActualResult] {
+
+ @transient lazy val logger = Logger[this.type]
+
+ def getRatings(sc: SparkContext): RDD[Rating] = {
+
+ val eventsRDD: RDD[Event] = PEventStore.find(
+ appName = dsp.appName,
+ entityType = Some("user"),
+ eventNames = Some(List("rate", "buy")), // read "rate" and "buy" event
+ // targetEntityType is optional field of an event.
+ targetEntityType = Some(Some("item")))(sc)
+
+ val ratingsRDD: RDD[Rating] = eventsRDD.map { event =>
+ val rating = try {
+ val ratingValue: Double = event.event match {
+ case "rate" => event.properties.get[Double]("rating")
+ case "buy" => 4.0 // map buy event to rating value of 4
+ case _ => throw new Exception(s"Unexpected event ${event} is read.")
+ }
+ // entityId and targetEntityId is String
+ Rating(event.entityId,
+ event.targetEntityId.get,
+ ratingValue)
+ } catch {
+ case e: Exception => {
+ logger.error(s"Cannot convert ${event} to Rating. Exception: ${e}.")
+ throw e
+ }
+ }
+ rating
+ }.cache()
+
+ ratingsRDD
+ }
+
+ override
+ def readTraining(sc: SparkContext): TrainingData = {
+ new TrainingData(getRatings(sc))
+ }
+
+ override
+ def readEval(sc: SparkContext)
+ : Seq[(TrainingData, EmptyEvaluationInfo, RDD[(Query, ActualResult)])] = {
+ require(!dsp.evalParams.isEmpty, "Must specify evalParams")
+ val evalParams = dsp.evalParams.get
+
+ val kFold = evalParams.kFold
+ val ratings: RDD[(Rating, Long)] = getRatings(sc).zipWithUniqueId
+ ratings.cache
+
+ (0 until kFold).map { idx => {
+ val trainingRatings = ratings.filter(_._2 % kFold != idx).map(_._1)
+ val testingRatings = ratings.filter(_._2 % kFold == idx).map(_._1)
+
+ val testingUsers: RDD[(String, Iterable[Rating])] = testingRatings.groupBy(_.user)
+
+ (new TrainingData(trainingRatings),
+ new EmptyEvaluationInfo(),
+ testingUsers.map {
+ case (user, ratings) => (Query(user, evalParams.queryNum), ActualResult(ratings.toArray))
+ }
+ )
+ }}
+ }
+}
+
+case class Rating(
+ user: String,
+ item: String,
+ rating: Double
+)
+
+class TrainingData(
+ val ratings: RDD[Rating]
+) extends Serializable {
+ override def toString = {
+ s"ratings: [${ratings.count()}] (${ratings.take(2).toList}...)"
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-serving/src/main/scala/Engine.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/customize-serving/src/main/scala/Engine.scala b/examples/scala-parallel-recommendation/customize-serving/src/main/scala/Engine.scala
new file mode 100644
index 0000000..b2a668b
--- /dev/null
+++ b/examples/scala-parallel-recommendation/customize-serving/src/main/scala/Engine.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.recommendation
+
+import org.apache.predictionio.controller.EngineFactory
+import org.apache.predictionio.controller.Engine
+
+case class Query(
+ user: String,
+ num: Int
+) extends Serializable
+
+case class PredictedResult(
+ itemScores: Array[ItemScore]
+) extends Serializable
+
+case class ActualResult(
+ ratings: Array[Rating]
+) extends Serializable
+
+case class ItemScore(
+ item: String,
+ score: Double
+) extends Serializable
+
+object RecommendationEngine extends EngineFactory {
+ def apply() = {
+ new Engine(
+ classOf[DataSource],
+ classOf[Preparator],
+ Map("als" -> classOf[ALSAlgorithm]),
+ classOf[Serving])
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-serving/src/main/scala/Evaluation.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/customize-serving/src/main/scala/Evaluation.scala b/examples/scala-parallel-recommendation/customize-serving/src/main/scala/Evaluation.scala
new file mode 100644
index 0000000..a665496
--- /dev/null
+++ b/examples/scala-parallel-recommendation/customize-serving/src/main/scala/Evaluation.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.recommendation
+
+import org.apache.predictionio.controller.Evaluation
+import org.apache.predictionio.controller.OptionAverageMetric
+import org.apache.predictionio.controller.AverageMetric
+import org.apache.predictionio.controller.EmptyEvaluationInfo
+import org.apache.predictionio.controller.EngineParamsGenerator
+import org.apache.predictionio.controller.EngineParams
+import org.apache.predictionio.controller.MetricEvaluator
+
+// Usage:
+// $ pio eval org.example.recommendation.RecommendationEvaluation \
+// org.example.recommendation.EngineParamsList
+
+case class PrecisionAtK(k: Int, ratingThreshold: Double = 2.0)
+ extends OptionAverageMetric[EmptyEvaluationInfo, Query, PredictedResult, ActualResult] {
+ require(k > 0, "k must be greater than 0")
+
+ override def header = s"Precision@K (k=$k, threshold=$ratingThreshold)"
+
+ def calculate(q: Query, p: PredictedResult, a: ActualResult): Option[Double] = {
+ val positives: Set[String] = a.ratings.filter(_.rating >= ratingThreshold).map(_.item).toSet
+
+ // If there is no positive results, Precision is undefined. We don't consider this case in the
+ // metrics, hence we return None.
+ if (positives.size == 0) {
+ None
+ } else {
+ val tpCount: Int = p.itemScores.take(k).filter(is => positives(is.item)).size
+ Some(tpCount.toDouble / math.min(k, positives.size))
+ }
+ }
+}
+
+case class PositiveCount(ratingThreshold: Double = 2.0)
+ extends AverageMetric[EmptyEvaluationInfo, Query, PredictedResult, ActualResult] {
+ override def header = s"PositiveCount (threshold=$ratingThreshold)"
+
+ def calculate(q: Query, p: PredictedResult, a: ActualResult): Double = {
+ a.ratings.filter(_.rating >= ratingThreshold).size
+ }
+}
+
+object RecommendationEvaluation extends Evaluation {
+ engineEvaluator = (
+ RecommendationEngine(),
+ MetricEvaluator(
+ metric = PrecisionAtK(k = 10, ratingThreshold = 4.0),
+ otherMetrics = Seq(
+ PositiveCount(ratingThreshold = 4.0),
+ PrecisionAtK(k = 10, ratingThreshold = 2.0),
+ PositiveCount(ratingThreshold = 2.0),
+ PrecisionAtK(k = 10, ratingThreshold = 1.0),
+ PositiveCount(ratingThreshold = 1.0)
+ )))
+}
+
+
+object ComprehensiveRecommendationEvaluation extends Evaluation {
+ val ratingThresholds = Seq(0.0, 2.0, 4.0)
+ val ks = Seq(1, 3, 10)
+
+ engineEvaluator = (
+ RecommendationEngine(),
+ MetricEvaluator(
+ metric = PrecisionAtK(k = 3, ratingThreshold = 2.0),
+ otherMetrics = (
+ (for (r <- ratingThresholds) yield PositiveCount(ratingThreshold = r)) ++
+ (for (r <- ratingThresholds; k <- ks) yield PrecisionAtK(k = k, ratingThreshold = r))
+ )))
+}
+
+
+trait BaseEngineParamsList extends EngineParamsGenerator {
+ protected val baseEP = EngineParams(
+ dataSourceParams = DataSourceParams(
+ appName = "MyApp1",
+ evalParams = Some(DataSourceEvalParams(kFold = 5, queryNum = 10))))
+}
+
+object EngineParamsList extends BaseEngineParamsList {
+ engineParamsList = for(
+ rank <- Seq(5, 10, 20);
+ numIterations <- Seq(1, 5, 10))
+ yield baseEP.copy(
+ algorithmParamsList = Seq(
+ ("als", ALSAlgorithmParams(rank, numIterations, 0.01, Some(3)))))
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-serving/src/main/scala/Preparator.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/customize-serving/src/main/scala/Preparator.scala b/examples/scala-parallel-recommendation/customize-serving/src/main/scala/Preparator.scala
new file mode 100644
index 0000000..6a41c47
--- /dev/null
+++ b/examples/scala-parallel-recommendation/customize-serving/src/main/scala/Preparator.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.recommendation
+
+import org.apache.predictionio.controller.PPreparator
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+class Preparator
+ extends PPreparator[TrainingData, PreparedData] {
+
+ def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = {
+ new PreparedData(ratings = trainingData.ratings)
+ }
+}
+
+class PreparedData(
+ val ratings: RDD[Rating]
+) extends Serializable
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-serving/src/main/scala/Serving.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/customize-serving/src/main/scala/Serving.scala b/examples/scala-parallel-recommendation/customize-serving/src/main/scala/Serving.scala
new file mode 100644
index 0000000..1042bce
--- /dev/null
+++ b/examples/scala-parallel-recommendation/customize-serving/src/main/scala/Serving.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.recommendation
+
+import org.apache.predictionio.controller.LServing
+
+import scala.io.Source
+
+import org.apache.predictionio.controller.Params // ADDED
+
+// ADDED ServingParams to specify the blacklisting file location.
+case class ServingParams(filepath: String) extends Params
+
+class Serving(val params: ServingParams)
+ extends LServing[Query, PredictedResult] {
+
+ override
+ def serve(query: Query, predictedResults: Seq[PredictedResult])
+ : PredictedResult = {
+ val disabledProducts: Set[String] = Source
+ .fromFile(params.filepath)
+ .getLines
+ .toSet
+
+ val itemScores = predictedResults.head.itemScores
+ PredictedResult(itemScores.filter(ps => !disabledProducts(ps.item)))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-serving/template.json
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/customize-serving/template.json b/examples/scala-parallel-recommendation/customize-serving/template.json
new file mode 100644
index 0000000..d076ec5
--- /dev/null
+++ b/examples/scala-parallel-recommendation/customize-serving/template.json
@@ -0,0 +1 @@
+{"pio": {"version": { "min": "0.10.0-incubating" }}}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/filter-by-category/README.md
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/filter-by-category/README.md b/examples/scala-parallel-recommendation/filter-by-category/README.md
deleted file mode 100644
index e4a7350..0000000
--- a/examples/scala-parallel-recommendation/filter-by-category/README.md
+++ /dev/null
@@ -1,202 +0,0 @@
-<!--
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements. See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
--->
-
-# Recommendation Template With Filtering by Category
-
-This engine template is based on the Recommendation Template version v0.1.2. It's modified so that each item has a set
-of categories, queries include a `categories` field and results only include items in any of the specified categories.
-
-## Documentation
-
-Please refer to http://predictionio.incubator.apache.org/templates/recommendation/quickstart/
-
-## Development Notes
-
-### Sample data
-
-This example uses two different files from the MovieLens 100k data. [Download](http://files.grouplens.org/datasets/movielens/ml-100k.zip)
-
-The first file is the ratings data file, which is a tab separated file. The first 3 fields are used: user id, item id
-and rating.
-
-The second file is the items data file, in where different fields are separated by the `|` character. The fields of
-interest are the first one (item id) and 19 category fields, starting at the 6th one and going till the end. Each
-of these fields has the value `1` if the item is the given category, and `0` otherwise.
-
-### Importing sample data
-
-The `data/import_eventserver.py` script allows importing the sample data. Valid arguments are:
-
-* `--url`: Event server URL
-* `--access_key` App access key
-* `--ratings_file`: Ratings file path
-* `--items_file`: Items file path
-
-### Changes to Engine.scala
-
-Added `category` field to the `Query` class:
-
-```scala
-case class Query(
- user: String,
- num: Int,
- categories: Array[String]
-) extends Serializable
-```
-
-### Changes to DataSource.scala
-
-* Introduced class `Item`:
-
-```scala
-case class Item(
- id: String,
- categories: List[String]
-)
-```
-
-* Modified class `TrainingData` to include items:
-
-```scala
-class TrainingData(
- val items: RDD[Item],
- val ratings: RDD[Rating]
-) extends Serializable {
- override def toString = {
- s"items: [${items.count()}] (${items.take(2).toList}...)" +
- s" ratings: [${ratings.count()}] (${ratings.take(2).toList}...)"
- }
-}
-```
-
-* Modified `readTraining` function so that it loads items:
-
-```scala
- ...
- val itemsRDD: RDD[Item] = eventsDb.aggregateProperties(
- appId = dsp.appId,
- entityType = "item"
- )(sc).map {
- case (entityId, properties) =>
- try {
- Item(id = entityId, categories = properties.get[List[String]]("categories"))
- } catch {
- case e: Exception =>
- logger.error(s"Failed to get properties ${properties} of" +
- s" item ${entityId}. Exception: ${e}.")
- throw e
- }
- }
- ...
-```
-
-### Changes to Preparator.scala
-
-* Added `items` field to class `PreparedData`. Pass `items` value from training data.
-
-```scala
-class PreparedData(
- val items: RDD[Item],
- val ratings: RDD[Rating]
-) extends Serializable
-```
-
-```scala
- new PreparedData(items = trainingData.items, ratings = trainingData.ratings)
-```
-
-### Changes to ALSModel.scala
-
-* Added `categoryItemsMap` field to ALSModel class
-
-```scala
-class ALSModel(
- override val rank: Int,
- override val userFeatures: RDD[(Int, Array[Double])],
- override val productFeatures: RDD[(Int, Array[Double])],
- val userStringIntMap: BiMap[String, Int],
- val itemStringIntMap: BiMap[String, Int],
- val categoryItemsMap: Map[String, Set[Int]])
-```
-
-* Added a function that recommends products whose ID is in a list of sets
-
-```scala
- def recommendProductsFromCategory(user: Int, num: Int, categoryItems: Array[Set[Int]]) = {
- val filteredProductFeatures = productFeatures
- .filter { case (id, _) => categoryItems.exists(_.contains(id)) }
- recommend(userFeatures.lookup(user).head, filteredProductFeatures, num)
- .map(t => Rating(user, t._1, t._2))
- }
-
- private def recommend(
- recommendToFeatures: Array[Double],
- recommendableFeatures: RDD[(Int, Array[Double])],
- num: Int): Array[(Int, Double)] = {
- val recommendToVector = new DoubleMatrix(recommendToFeatures)
- val scored = recommendableFeatures.map { case (id,features) =>
- (id, recommendToVector.dot(new DoubleMatrix(features)))
- }
- scored.top(num)(Ordering.by(_._2))
- }
-```
-
-### Changes to ALSAlgorithm.scala
-
-* Find set of categories
-
-```scala
- ...
- val categories = data.items.flatMap(_.categories).distinct().collect().toSet
- ...
-```
-
-* Find set of items of each category
-
-```scala
- ...
- val categoriesMap = categories.map { category =>
- category -> data.items
- .filter(_.categories.contains(category))
- .map(item => itemStringIntMap(item.id))
- .collect()
- .toSet
- }.toMap
- ...
-```
-
-* Find list of sets of items on which we are interested, use new `recommendProductsFromCategory` function:
-
-```scala
- ...
- val categoriesItems = query.categories.map { category =>
- model.categoryItemsMap.getOrElse(category, Set.empty)
- }
- // recommendProductsFromCategory() returns Array[MLlibRating], which uses item Int
- // index. Convert it to String ID for returning PredictedResult
- val itemScores = model.recommendProductsFromCategory(userInt, query.num, categoriesItems)
- .map (r => ItemScore(itemIntStringMap(r.product), r.rating))
- ...
-```
-
-### Example Request
-
-The script `data/send_query.py` has been modified to represent the updated query structure:
-
-```python
-print engine_client.send_query({"user": "1", "num": 4, "categories": ["action", "western"]})
-```
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/filter-by-category/build.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/filter-by-category/build.sbt b/examples/scala-parallel-recommendation/filter-by-category/build.sbt
deleted file mode 100644
index 81cd3ec..0000000
--- a/examples/scala-parallel-recommendation/filter-by-category/build.sbt
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import AssemblyKeys._
-
-assemblySettings
-
-name := "template-scala-parallel-recommendation"
-
-organization := "org.apache.predictionio"
-
-libraryDependencies ++= Seq(
- "org.apache.predictionio" %% "core" % pioVersion.value % "provided",
- "org.apache.spark" %% "spark-core" % "1.2.0" % "provided",
- "org.apache.spark" %% "spark-mllib" % "1.2.0" % "provided")
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/filter-by-category/data/import_eventserver.py
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/filter-by-category/data/import_eventserver.py b/examples/scala-parallel-recommendation/filter-by-category/data/import_eventserver.py
deleted file mode 100644
index 643f4db..0000000
--- a/examples/scala-parallel-recommendation/filter-by-category/data/import_eventserver.py
+++ /dev/null
@@ -1,98 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""
-Import sample data for recommendation engine
-"""
-
-import predictionio
-import argparse
-import random
-
-ITEM_ACTIONS_DELIMITER = "|"
-RATE_ACTIONS_DELIMITER = "\t"
-SEED = 3
-
-GENRES = ["unknown", "action", "adventure", "animation", "children's", "comedy","crime", "documentary", "drama",
- "fantasy", "film-noir", "horror", "musical", "mystery", "romance", "sci-fi", "thriller", "war", "western"]
-
-def import_events(client, items_file, ratings_file):
- random.seed(SEED)
-
- f = open(items_file, 'r')
- print "Importing items..."
- items = 0
- for line in f:
- data = line.rstrip('\r\n').split(ITEM_ACTIONS_DELIMITER)
- id = data[0]
- genres_str = data[5:]
- genres_bool = [bool(int(g)) for g in genres_str]
- genres = [g for b, g in zip(genres_bool, GENRES) if b]
- client.create_event(
- event="$set",
- entity_type="item",
- entity_id=id,
- properties= { "categories" : genres }
- )
- items += 1
- print "%s items are imported." % items
- f.close()
-
- f = open(ratings_file, 'r')
- print "Importing ratings..."
- ratings = 0
- for line in f:
- data = line.rstrip('\r\n').split(RATE_ACTIONS_DELIMITER)
- # For demonstration purpose, randomly mix in some buy events
- if random.randint(0, 1) == 1:
- client.create_event(
- event="rate",
- entity_type="user",
- entity_id=data[0],
- target_entity_type="item",
- target_entity_id=data[1],
- properties= { "rating" : float(data[2]) }
- )
- else:
- client.create_event(
- event="buy",
- entity_type="user",
- entity_id=data[0],
- target_entity_type="item",
- target_entity_id=data[1]
- )
- ratings += 1
- f.close()
- print "%s ratings are imported." % ratings
-
-if __name__ == '__main__':
- parser = argparse.ArgumentParser(
- description="Import sample data for recommendation engine")
- parser.add_argument('--access_key', default='invald_access_key')
- parser.add_argument('--url', default="http://localhost:7070")
- parser.add_argument('--ratings_file')
- parser.add_argument('--items_file')
-
- args = parser.parse_args()
- print args
-
- client = predictionio.EventClient(
- access_key=args.access_key,
- url=args.url,
- threads=5,
- qsize=500)
- import_events(client, args.items_file, args.ratings_file)
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/filter-by-category/data/send_query.py
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/filter-by-category/data/send_query.py b/examples/scala-parallel-recommendation/filter-by-category/data/send_query.py
deleted file mode 100644
index 9e061de..0000000
--- a/examples/scala-parallel-recommendation/filter-by-category/data/send_query.py
+++ /dev/null
@@ -1,24 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""
-Send sample query to prediction engine
-"""
-
-import predictionio
-engine_client = predictionio.EngineClient(url="http://localhost:8000")
-print engine_client.send_query({"user": "1", "num": 4, "categories": ["action", "western"]})
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/filter-by-category/engine.json
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/filter-by-category/engine.json b/examples/scala-parallel-recommendation/filter-by-category/engine.json
deleted file mode 100644
index f97113e..0000000
--- a/examples/scala-parallel-recommendation/filter-by-category/engine.json
+++ /dev/null
@@ -1,21 +0,0 @@
-{
- "id": "default",
- "description": "Default settings",
- "engineFactory": "org.template.recommendation.RecommendationEngine",
- "datasource": {
- "params" : {
- "appId": 2
- }
- },
- "algorithms": [
- {
- "name": "als",
- "params": {
- "rank": 10,
- "numIterations": 20,
- "lambda": 0.01,
- "seed": 3
- }
- }
- ]
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/filter-by-category/project/assembly.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/filter-by-category/project/assembly.sbt b/examples/scala-parallel-recommendation/filter-by-category/project/assembly.sbt
deleted file mode 100644
index 54c3252..0000000
--- a/examples/scala-parallel-recommendation/filter-by-category/project/assembly.sbt
+++ /dev/null
@@ -1 +0,0 @@
-addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/filter-by-category/project/pio-build.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/filter-by-category/project/pio-build.sbt b/examples/scala-parallel-recommendation/filter-by-category/project/pio-build.sbt
deleted file mode 100644
index 9aed0ee..0000000
--- a/examples/scala-parallel-recommendation/filter-by-category/project/pio-build.sbt
+++ /dev/null
@@ -1 +0,0 @@
-addSbtPlugin("org.apache.predictionio" % "pio-build" % "0.9.0")
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/ALSAlgorithm.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/ALSAlgorithm.scala
deleted file mode 100644
index 0046e69..0000000
--- a/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/ALSAlgorithm.scala
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.recommendation
-
-import org.apache.predictionio.controller.PAlgorithm
-import org.apache.predictionio.controller.Params
-import org.apache.predictionio.data.storage.BiMap
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.mllib.recommendation.ALS
-import org.apache.spark.mllib.recommendation.{Rating => MLlibRating}
-import org.apache.spark.mllib.recommendation.ALSModel
-
-import grizzled.slf4j.Logger
-
-case class ALSAlgorithmParams(
- rank: Int,
- numIterations: Int,
- lambda: Double,
- seed: Option[Long]) extends Params
-
-class ALSAlgorithm(val ap: ALSAlgorithmParams)
- extends PAlgorithm[PreparedData, ALSModel, Query, PredictedResult] {
-
- @transient lazy val logger = Logger[this.type]
-
- def train(sc: SparkContext, data: PreparedData): ALSModel = {
- // MLLib ALS cannot handle empty training data.
- require(data.ratings.take(1).nonEmpty,
- s"RDD[Rating] in PreparedData cannot be empty." +
- " Please check if DataSource generates TrainingData" +
- " and Preprator generates PreparedData correctly.")
- // Convert user and item String IDs to Int index for MLlib
- val userStringIntMap = BiMap.stringInt(data.ratings.map(_.user))
- val itemStringIntMap = BiMap.stringInt(data.ratings.map(_.item))
- val mllibRatings = data.ratings.map( r =>
- // MLlibRating requires integer index for user and item
- MLlibRating(userStringIntMap(r.user), itemStringIntMap(r.item), r.rating)
- )
-
- // seed for MLlib ALS
- val seed = ap.seed.getOrElse(System.nanoTime)
-
- // If you only have one type of implicit event (Eg. "view" event only),
- // replace ALS.train(...) with
- //val m = ALS.trainImplicit(
- //ratings = mllibRatings,
- //rank = ap.rank,
- //iterations = ap.numIterations,
- //lambda = ap.lambda,
- //blocks = -1,
- //alpha = 1.0,
- //seed = seed)
-
- val m = ALS.train(
- ratings = mllibRatings,
- rank = ap.rank,
- iterations = ap.numIterations,
- lambda = ap.lambda,
- blocks = -1,
- seed = seed)
-
- val categories = data.items.flatMap(_.categories).distinct().collect().toSet
-
- val categoriesMap = categories.map { category =>
- category -> data.items
- .filter(_.categories.contains(category))
- .map(item => itemStringIntMap(item.id))
- .collect()
- .toSet
- }.toMap
-
- new ALSModel(
- rank = m.rank,
- userFeatures = m.userFeatures,
- productFeatures = m.productFeatures,
- userStringIntMap = userStringIntMap,
- itemStringIntMap = itemStringIntMap,
- categoryItemsMap = categoriesMap)
- }
-
- def predict(model: ALSModel, query: Query): PredictedResult = {
- // Convert String ID to Int index for Mllib
- model.userStringIntMap.get(query.user).map { userInt =>
- // create inverse view of itemStringIntMap
- val itemIntStringMap = model.itemStringIntMap.inverse
- // Find items on query category
- val categoriesItems = query.categories.map { category =>
- model.categoryItemsMap.getOrElse(category, Set.empty)
- }
- // recommendProductsFromCategory() returns Array[MLlibRating], which uses item Int
- // index. Convert it to String ID for returning PredictedResult
- val itemScores = model.recommendProductsFromCategory(userInt, query.num, categoriesItems)
- .map (r => ItemScore(itemIntStringMap(r.product), r.rating))
- new PredictedResult(itemScores)
- }.getOrElse{
- logger.info(s"No prediction for unknown user ${query.user}.")
- new PredictedResult(Array.empty)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/ALSModel.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/ALSModel.scala b/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/ALSModel.scala
deleted file mode 100644
index 1816dc9..0000000
--- a/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/ALSModel.scala
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.mllib.recommendation
-// This must be the same package as Spark's MatrixFactorizationModel because
-// MatrixFactorizationModel's constructor is private and we are using
-// its constructor in order to save and load the model
-
-import org.jblas.DoubleMatrix
-import org.template.recommendation.ALSAlgorithmParams
-
-import org.apache.predictionio.controller.IPersistentModel
-import org.apache.predictionio.controller.IPersistentModelLoader
-import org.apache.predictionio.data.storage.BiMap
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-class ALSModel(
- override val rank: Int,
- override val userFeatures: RDD[(Int, Array[Double])],
- override val productFeatures: RDD[(Int, Array[Double])],
- val userStringIntMap: BiMap[String, Int],
- val itemStringIntMap: BiMap[String, Int],
- val categoryItemsMap: Map[String, Set[Int]])
- extends MatrixFactorizationModel(rank, userFeatures, productFeatures)
- with IPersistentModel[ALSAlgorithmParams] {
-
- def recommendProductsFromCategory(user: Int, num: Int, categoryItems: Array[Set[Int]]) = {
- val filteredProductFeatures = productFeatures
- .filter { case (id, _) => categoryItems.exists(_.contains(id)) }
- recommend(userFeatures.lookup(user).head, filteredProductFeatures, num)
- .map(t => Rating(user, t._1, t._2))
- }
-
- private def recommend(
- recommendToFeatures: Array[Double],
- recommendableFeatures: RDD[(Int, Array[Double])],
- num: Int): Array[(Int, Double)] = {
- val recommendToVector = new DoubleMatrix(recommendToFeatures)
- val scored = recommendableFeatures.map { case (id,features) =>
- (id, recommendToVector.dot(new DoubleMatrix(features)))
- }
- scored.top(num)(Ordering.by(_._2))
- }
-
- def save(id: String, params: ALSAlgorithmParams,
- sc: SparkContext): Boolean = {
-
- sc.parallelize(Seq(rank)).saveAsObjectFile(s"/tmp/${id}/rank")
- userFeatures.saveAsObjectFile(s"/tmp/${id}/userFeatures")
- productFeatures.saveAsObjectFile(s"/tmp/${id}/productFeatures")
- sc.parallelize(Seq(userStringIntMap)).saveAsObjectFile(s"/tmp/${id}/userStringIntMap")
- sc.parallelize(Seq(itemStringIntMap)).saveAsObjectFile(s"/tmp/${id}/itemStringIntMap")
- sc.parallelize(Seq(categoryItemsMap)).saveAsObjectFile(s"/tmp/${id}/categoryItemsMap")
- true
- }
-
- override def toString = {
- s"userFeatures: [${userFeatures.count()}]" +
- s"(${userFeatures.take(2).toList}...)" +
- s" productFeatures: [${productFeatures.count()}]" +
- s"(${productFeatures.take(2).toList}...)" +
- s" userStringIntMap: [${userStringIntMap.size}]" +
- s"(${userStringIntMap.take(2)}...)" +
- s" itemStringIntMap: [${itemStringIntMap.size}]" +
- s"(${itemStringIntMap.take(2)}...)"
- }
-}
-
-object ALSModel
- extends IPersistentModelLoader[ALSAlgorithmParams, ALSModel] {
- def apply(id: String, params: ALSAlgorithmParams,
- sc: Option[SparkContext]) = {
- new ALSModel(
- rank = sc.get.objectFile[Int](s"/tmp/${id}/rank").first,
- userFeatures = sc.get.objectFile(s"/tmp/${id}/userFeatures"),
- productFeatures = sc.get.objectFile(s"/tmp/${id}/productFeatures"),
- userStringIntMap = sc.get.objectFile[BiMap[String, Int]](s"/tmp/${id}/userStringIntMap").first,
- itemStringIntMap = sc.get.objectFile[BiMap[String, Int]](s"/tmp/${id}/itemStringIntMap").first,
- categoryItemsMap = sc.get.objectFile[Map[String, Set[Int]]](s"/tmp/${id}/categoryItemsMap").first)
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/DataSource.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/DataSource.scala b/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/DataSource.scala
deleted file mode 100644
index 769524b..0000000
--- a/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/DataSource.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.recommendation
-
-import org.apache.predictionio.controller.PDataSource
-import org.apache.predictionio.controller.EmptyEvaluationInfo
-import org.apache.predictionio.controller.EmptyActualResult
-import org.apache.predictionio.controller.Params
-import org.apache.predictionio.data.storage.Event
-import org.apache.predictionio.data.storage.Storage
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-import grizzled.slf4j.Logger
-
-case class DataSourceParams(appId: Int) extends Params
-
-class DataSource(val dsp: DataSourceParams)
- extends PDataSource[TrainingData,
- EmptyEvaluationInfo, Query, EmptyActualResult] {
-
- @transient lazy val logger = Logger[this.type]
-
- override
- def readTraining(sc: SparkContext): TrainingData = {
- val eventsDb = Storage.getPEvents()
-
- val itemsRDD: RDD[Item] = eventsDb.aggregateProperties(
- appId = dsp.appId,
- entityType = "item"
- )(sc).map {
- case (entityId, properties) =>
- try {
- Item(id = entityId, categories = properties.get[List[String]]("categories"))
- } catch {
- case e: Exception =>
- logger.error(s"Failed to get properties ${properties} of" +
- s" item ${entityId}. Exception: ${e}.")
- throw e
- }
- }
-
- val rateEventsRDD: RDD[Event] = eventsDb.find(
- appId = dsp.appId,
- entityType = Some("user"),
- eventNames = Some(List("rate", "buy")), // read "rate" and "buy" event
- // targetEntityType is optional field of an event.
- targetEntityType = Some(Some("item")))(sc)
-
- val ratingsRDD: RDD[Rating] = rateEventsRDD.map { event =>
- val rating = try {
- val ratingValue: Double = event.event match {
- case "rate" => event.properties.get[Double]("rating")
- case "buy" => 4.0 // map buy event to rating value of 4
- case _ => throw new Exception(s"Unexpected event ${event} is read.")
- }
- // entityId and targetEntityId is String
- Rating(event.entityId,
- event.targetEntityId.get,
- ratingValue)
- } catch {
- case e: Exception => {
- logger.error(s"Cannot convert ${event} to Rating. Exception: ${e}.")
- throw e
- }
- }
- rating
- }.cache()
-
- new TrainingData(itemsRDD, ratingsRDD)
- }
-}
-
-case class Item(
- id: String,
- categories: List[String]
-)
-
-case class Rating(
- user: String,
- item: String,
- rating: Double
-)
-
-case class TrainingData(
- items: RDD[Item],
- ratings: RDD[Rating]
-) {
- override def toString = {
- s"items: [${items.count()}] (${items.take(2).toList}...)" +
- s" ratings: [${ratings.count()}] (${ratings.take(2).toList}...)"
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/Engine.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/Engine.scala b/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/Engine.scala
deleted file mode 100644
index daec4e1..0000000
--- a/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/Engine.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.recommendation
-
-import org.apache.predictionio.controller.IEngineFactory
-import org.apache.predictionio.controller.Engine
-
-case class Query(
- user: String,
- num: Int,
- categories: Array[String]
-)
-
-case class PredictedResult(
- itemScores: Array[ItemScore]
-)
-
-case class ItemScore(
- item: String,
- score: Double
-)
-
-object RecommendationEngine extends IEngineFactory {
- def apply() = {
- new Engine(
- classOf[DataSource],
- classOf[Preparator],
- Map("als" -> classOf[ALSAlgorithm]),
- classOf[Serving])
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/Preparator.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/Preparator.scala b/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/Preparator.scala
deleted file mode 100644
index 991f8ab..0000000
--- a/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/Preparator.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.recommendation
-
-import org.apache.predictionio.controller.PPreparator
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-class Preparator
- extends PPreparator[TrainingData, PreparedData] {
-
- def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = {
- new PreparedData(items = trainingData.items, ratings = trainingData.ratings)
- }
-}
-
-class PreparedData(
- val items: RDD[Item],
- val ratings: RDD[Rating]
-) extends Serializable
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/Serving.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/Serving.scala b/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/Serving.scala
deleted file mode 100644
index 01d4a6d..0000000
--- a/examples/scala-parallel-recommendation/filter-by-category/src/main/scala/Serving.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.recommendation
-
-import org.apache.predictionio.controller.LServing
-
-class Serving
- extends LServing[Query, PredictedResult] {
-
- override
- def serve(query: Query,
- predictedResults: Seq[PredictedResult]): PredictedResult = {
- predictedResults.head
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/reading-custom-events/.gitignore
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/reading-custom-events/.gitignore b/examples/scala-parallel-recommendation/reading-custom-events/.gitignore
new file mode 100644
index 0000000..3f3403a
--- /dev/null
+++ b/examples/scala-parallel-recommendation/reading-custom-events/.gitignore
@@ -0,0 +1,5 @@
+data/sample_movielens_data.txt
+manifest.json
+target/
+pio.log
+/pio.sbt
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/reading-custom-events/build.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/reading-custom-events/build.sbt b/examples/scala-parallel-recommendation/reading-custom-events/build.sbt
new file mode 100644
index 0000000..3d25df1
--- /dev/null
+++ b/examples/scala-parallel-recommendation/reading-custom-events/build.sbt
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+name := "template-scala-parallel-recommendation"
+
+organization := "org.apache.predictionio"
+scalaVersion := "2.11.8"
+libraryDependencies ++= Seq(
+ "org.apache.predictionio" %% "apache-predictionio-core" % "0.11.0-incubating" % "provided",
+ "org.apache.spark" %% "spark-mllib" % "2.1.1" % "provided")
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/reading-custom-events/data/import_eventserver.py
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/reading-custom-events/data/import_eventserver.py b/examples/scala-parallel-recommendation/reading-custom-events/data/import_eventserver.py
new file mode 100644
index 0000000..7d88568
--- /dev/null
+++ b/examples/scala-parallel-recommendation/reading-custom-events/data/import_eventserver.py
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Import sample data for recommendation engine
+"""
+
+import predictionio
+import argparse
+import random
+
+RATE_ACTIONS_DELIMITER = "::"
+SEED = 3
+
+def import_events(client, file):
+ f = open(file, 'r')
+ random.seed(SEED)
+ count = 0
+ print("Importing data...")
+ for line in f:
+ data = line.rstrip('\r\n').split(RATE_ACTIONS_DELIMITER)
+ # For demonstration purpose, randomly mix in some dislike events
+ if (random.randint(0, 1) == 1):
+ client.create_event(
+ event="like",
+ entity_type="customer",
+ entity_id=data[0],
+ target_entity_type="product",
+ target_entity_id=data[1]
+ )
+ else:
+ client.create_event(
+ event="dislike",
+ entity_type="customer",
+ entity_id=data[0],
+ target_entity_type="product",
+ target_entity_id=data[1]
+ )
+ count += 1
+ f.close()
+ print("%s events are imported." % count)
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser(
+ description="Import sample data for recommendation engine")
+ parser.add_argument('--access_key', default='invald_access_key')
+ parser.add_argument('--url', default="http://localhost:7070")
+ parser.add_argument('--file', default="./data/sample_movielens_data.txt")
+
+ args = parser.parse_args()
+ print(args)
+
+ client = predictionio.EventClient(
+ access_key=args.access_key,
+ url=args.url,
+ threads=5,
+ qsize=500)
+ import_events(client, args.file)
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/reading-custom-events/data/send_query.py
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/reading-custom-events/data/send_query.py b/examples/scala-parallel-recommendation/reading-custom-events/data/send_query.py
new file mode 100644
index 0000000..f6ec9ab
--- /dev/null
+++ b/examples/scala-parallel-recommendation/reading-custom-events/data/send_query.py
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Send sample query to prediction engine
+"""
+
+import predictionio
+engine_client = predictionio.EngineClient(url="http://localhost:8000")
+print(engine_client.send_query({"user": "1", "num": 4}))
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/reading-custom-events/engine.json
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/reading-custom-events/engine.json b/examples/scala-parallel-recommendation/reading-custom-events/engine.json
new file mode 100644
index 0000000..718b0e1
--- /dev/null
+++ b/examples/scala-parallel-recommendation/reading-custom-events/engine.json
@@ -0,0 +1,21 @@
+{
+ "id": "default",
+ "description": "Default settings",
+ "engineFactory": "org.apache.predictionio.examples.recommendation.RecommendationEngine",
+ "datasource": {
+ "params" : {
+ "appName": "MyApp1"
+ }
+ },
+ "algorithms": [
+ {
+ "name": "als",
+ "params": {
+ "rank": 10,
+ "numIterations": 20,
+ "lambda": 0.01,
+ "seed": 3
+ }
+ }
+ ]
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/reading-custom-events/project/assembly.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/reading-custom-events/project/assembly.sbt b/examples/scala-parallel-recommendation/reading-custom-events/project/assembly.sbt
new file mode 100644
index 0000000..92636bc
--- /dev/null
+++ b/examples/scala-parallel-recommendation/reading-custom-events/project/assembly.sbt
@@ -0,0 +1 @@
+addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.4")
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/reading-custom-events/project/build.properties
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/reading-custom-events/project/build.properties b/examples/scala-parallel-recommendation/reading-custom-events/project/build.properties
new file mode 100644
index 0000000..64317fd
--- /dev/null
+++ b/examples/scala-parallel-recommendation/reading-custom-events/project/build.properties
@@ -0,0 +1 @@
+sbt.version=0.13.15
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/ALSAlgorithm.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/ALSAlgorithm.scala
new file mode 100644
index 0000000..b0f874d
--- /dev/null
+++ b/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/ALSAlgorithm.scala
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.recommendation
+
+import org.apache.predictionio.controller.PAlgorithm
+import org.apache.predictionio.controller.Params
+import org.apache.predictionio.data.storage.BiMap
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.recommendation.ALS
+import org.apache.spark.mllib.recommendation.{Rating => MLlibRating}
+import org.apache.spark.mllib.recommendation.ALSModel
+
+import grizzled.slf4j.Logger
+
+case class ALSAlgorithmParams(
+ rank: Int,
+ numIterations: Int,
+ lambda: Double,
+ seed: Option[Long]) extends Params
+
+class ALSAlgorithm(val ap: ALSAlgorithmParams)
+ extends PAlgorithm[PreparedData, ALSModel, Query, PredictedResult] {
+
+ @transient lazy val logger = Logger[this.type]
+
+ if (ap.numIterations > 30) {
+ logger.warn(
+ s"ALSAlgorithmParams.numIterations > 30, current: ${ap.numIterations}. " +
+ s"There is a chance of running to StackOverflowException." +
+ s"To remedy it, set lower numIterations or checkpoint parameters.")
+ }
+
+ def train(sc: SparkContext, data: PreparedData): ALSModel = {
+ // MLLib ALS cannot handle empty training data.
+ require(!data.ratings.take(1).isEmpty,
+ s"RDD[Rating] in PreparedData cannot be empty." +
+ " Please check if DataSource generates TrainingData" +
+ " and Preparator generates PreparedData correctly.")
+ // Convert user and item String IDs to Int index for MLlib
+
+ val userStringIntMap = BiMap.stringInt(data.ratings.map(_.user))
+ val itemStringIntMap = BiMap.stringInt(data.ratings.map(_.item))
+ val mllibRatings = data.ratings.map( r =>
+ // MLlibRating requires integer index for user and item
+ MLlibRating(userStringIntMap(r.user), itemStringIntMap(r.item), r.rating)
+ )
+
+ // seed for MLlib ALS
+ val seed = ap.seed.getOrElse(System.nanoTime)
+
+ // Set checkpoint directory
+ // sc.setCheckpointDir("checkpoint")
+
+ // If you only have one type of implicit event (Eg. "view" event only),
+ // set implicitPrefs to true
+ val implicitPrefs = false
+ val als = new ALS()
+ als.setUserBlocks(-1)
+ als.setProductBlocks(-1)
+ als.setRank(ap.rank)
+ als.setIterations(ap.numIterations)
+ als.setLambda(ap.lambda)
+ als.setImplicitPrefs(implicitPrefs)
+ als.setAlpha(1.0)
+ als.setSeed(seed)
+ als.setCheckpointInterval(10)
+ val m = als.run(mllibRatings)
+
+ new ALSModel(
+ rank = m.rank,
+ userFeatures = m.userFeatures,
+ productFeatures = m.productFeatures,
+ userStringIntMap = userStringIntMap,
+ itemStringIntMap = itemStringIntMap)
+ }
+
+ def predict(model: ALSModel, query: Query): PredictedResult = {
+ // Convert String ID to Int index for Mllib
+ model.userStringIntMap.get(query.user).map { userInt =>
+ // create inverse view of itemStringIntMap
+ val itemIntStringMap = model.itemStringIntMap.inverse
+ // recommendProducts() returns Array[MLlibRating], which uses item Int
+ // index. Convert it to String ID for returning PredictedResult
+ val itemScores = model.recommendProducts(userInt, query.num)
+ .map (r => ItemScore(itemIntStringMap(r.product), r.rating))
+ new PredictedResult(itemScores)
+ }.getOrElse{
+ logger.info(s"No prediction for unknown user ${query.user}.")
+ new PredictedResult(Array.empty)
+ }
+ }
+
+ // This function is used by the evaluation module, where a batch of queries is sent to this engine
+ // for evaluation purpose.
+ override def batchPredict(model: ALSModel, queries: RDD[(Long, Query)]): RDD[(Long, PredictedResult)] = {
+ val userIxQueries: RDD[(Int, (Long, Query))] = queries
+ .map { case (ix, query) => {
+ // If user not found, then the index is -1
+ val userIx = model.userStringIntMap.get(query.user).getOrElse(-1)
+ (userIx, (ix, query))
+ }}
+
+ // Cross product of all valid users from the queries and products in the model.
+ val usersProducts: RDD[(Int, Int)] = userIxQueries
+ .keys
+ .filter(_ != -1)
+ .cartesian(model.productFeatures.map(_._1))
+
+ // Call mllib ALS's predict function.
+ val ratings: RDD[MLlibRating] = model.predict(usersProducts)
+
+ // The following code construct predicted results from mllib's ratings.
+ // Not optimal implementation. Instead of groupBy, should use combineByKey with a PriorityQueue
+ val userRatings: RDD[(Int, Iterable[MLlibRating])] = ratings.groupBy(_.user)
+
+ userIxQueries.leftOuterJoin(userRatings)
+ .map {
+ // When there are ratings
+ case (userIx, ((ix, query), Some(ratings))) => {
+ val topItemScores: Array[ItemScore] = ratings
+ .toArray
+ .sortBy(_.rating)(Ordering.Double.reverse) // note: from large to small ordering
+ .take(query.num)
+ .map { rating => ItemScore(
+ model.itemStringIntMap.inverse(rating.product),
+ rating.rating) }
+
+ (ix, PredictedResult(itemScores = topItemScores))
+ }
+ // When user doesn't exist in training data
+ case (userIx, ((ix, query), None)) => {
+ require(userIx == -1)
+ (ix, PredictedResult(itemScores = Array.empty))
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/ALSModel.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/ALSModel.scala b/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/ALSModel.scala
new file mode 100644
index 0000000..898858d
--- /dev/null
+++ b/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/ALSModel.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.recommendation
+// This must be the same package as Spark's MatrixFactorizationModel because
+// MatrixFactorizationModel's constructor is private and we are using
+// its constructor in order to save and load the model
+
+import org.apache.predictionio.examples.recommendation.ALSAlgorithmParams
+
+import org.apache.predictionio.controller.PersistentModel
+import org.apache.predictionio.controller.PersistentModelLoader
+import org.apache.predictionio.data.storage.BiMap
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+class ALSModel(
+ override val rank: Int,
+ override val userFeatures: RDD[(Int, Array[Double])],
+ override val productFeatures: RDD[(Int, Array[Double])],
+ val userStringIntMap: BiMap[String, Int],
+ val itemStringIntMap: BiMap[String, Int])
+ extends MatrixFactorizationModel(rank, userFeatures, productFeatures)
+ with PersistentModel[ALSAlgorithmParams] {
+
+ def save(id: String, params: ALSAlgorithmParams,
+ sc: SparkContext): Boolean = {
+
+ sc.parallelize(Seq(rank)).saveAsObjectFile(s"/tmp/${id}/rank")
+ userFeatures.saveAsObjectFile(s"/tmp/${id}/userFeatures")
+ productFeatures.saveAsObjectFile(s"/tmp/${id}/productFeatures")
+ sc.parallelize(Seq(userStringIntMap))
+ .saveAsObjectFile(s"/tmp/${id}/userStringIntMap")
+ sc.parallelize(Seq(itemStringIntMap))
+ .saveAsObjectFile(s"/tmp/${id}/itemStringIntMap")
+ true
+ }
+
+ override def toString = {
+ s"userFeatures: [${userFeatures.count()}]" +
+ s"(${userFeatures.take(2).toList}...)" +
+ s" productFeatures: [${productFeatures.count()}]" +
+ s"(${productFeatures.take(2).toList}...)" +
+ s" userStringIntMap: [${userStringIntMap.size}]" +
+ s"(${userStringIntMap.take(2)}...)" +
+ s" itemStringIntMap: [${itemStringIntMap.size}]" +
+ s"(${itemStringIntMap.take(2)}...)"
+ }
+}
+
+object ALSModel
+ extends PersistentModelLoader[ALSAlgorithmParams, ALSModel] {
+ def apply(id: String, params: ALSAlgorithmParams,
+ sc: Option[SparkContext]) = {
+ new ALSModel(
+ rank = sc.get.objectFile[Int](s"/tmp/${id}/rank").first,
+ userFeatures = sc.get.objectFile(s"/tmp/${id}/userFeatures"),
+ productFeatures = sc.get.objectFile(s"/tmp/${id}/productFeatures"),
+ userStringIntMap = sc.get
+ .objectFile[BiMap[String, Int]](s"/tmp/${id}/userStringIntMap").first,
+ itemStringIntMap = sc.get
+ .objectFile[BiMap[String, Int]](s"/tmp/${id}/itemStringIntMap").first)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/DataSource.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/DataSource.scala b/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/DataSource.scala
new file mode 100644
index 0000000..cf9738d
--- /dev/null
+++ b/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/DataSource.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.recommendation
+
+import org.apache.predictionio.controller.PDataSource
+import org.apache.predictionio.controller.EmptyEvaluationInfo
+import org.apache.predictionio.controller.EmptyActualResult
+import org.apache.predictionio.controller.Params
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.store.PEventStore
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+import grizzled.slf4j.Logger
+
+case class DataSourceEvalParams(kFold: Int, queryNum: Int)
+
+case class DataSourceParams(
+ appName: String,
+ evalParams: Option[DataSourceEvalParams]) extends Params
+
+class DataSource(val dsp: DataSourceParams)
+ extends PDataSource[TrainingData,
+ EmptyEvaluationInfo, Query, ActualResult] {
+
+ @transient lazy val logger = Logger[this.type]
+
+ def getRatings(sc: SparkContext): RDD[Rating] = {
+
+ val eventsRDD: RDD[Event] = PEventStore.find(
+ appName = dsp.appName,
+ entityType = Some("customer"), // MODIFIED
+ eventNames = Some(List("like", "dislike")), // MODIFIED
+ // targetEntityType is optional field of an event.
+ targetEntityType = Some(Some("product")))(sc) // MODIFIED
+
+ val ratingsRDD: RDD[Rating] = eventsRDD.map { event =>
+ val rating = try {
+ val ratingValue: Double = event.event match {
+ // MODIFIED
+ case "like" => 4.0 // map a like event to a rating of 4.0
+ case "dislike" => 1.0 // map a like event to a rating of 1.0
+ case _ => throw new Exception(s"Unexpected event ${event} is read.")
+ }
+ // entityId and targetEntityId is String
+ Rating(event.entityId,
+ event.targetEntityId.get,
+ ratingValue)
+ } catch {
+ case e: Exception => {
+ logger.error(s"Cannot convert ${event} to Rating. Exception: ${e}.")
+ throw e
+ }
+ }
+ rating
+ }.cache()
+
+ ratingsRDD
+ }
+
+ override
+ def readTraining(sc: SparkContext): TrainingData = {
+ new TrainingData(getRatings(sc))
+ }
+
+ override
+ def readEval(sc: SparkContext)
+ : Seq[(TrainingData, EmptyEvaluationInfo, RDD[(Query, ActualResult)])] = {
+ require(!dsp.evalParams.isEmpty, "Must specify evalParams")
+ val evalParams = dsp.evalParams.get
+
+ val kFold = evalParams.kFold
+ val ratings: RDD[(Rating, Long)] = getRatings(sc).zipWithUniqueId
+ ratings.cache
+
+ (0 until kFold).map { idx => {
+ val trainingRatings = ratings.filter(_._2 % kFold != idx).map(_._1)
+ val testingRatings = ratings.filter(_._2 % kFold == idx).map(_._1)
+
+ val testingUsers: RDD[(String, Iterable[Rating])] = testingRatings.groupBy(_.user)
+
+ (new TrainingData(trainingRatings),
+ new EmptyEvaluationInfo(),
+ testingUsers.map {
+ case (user, ratings) => (Query(user, evalParams.queryNum), ActualResult(ratings.toArray))
+ }
+ )
+ }}
+ }
+}
+
+case class Rating(
+ user: String,
+ item: String,
+ rating: Double
+)
+
+class TrainingData(
+ val ratings: RDD[Rating]
+) extends Serializable {
+ override def toString = {
+ s"ratings: [${ratings.count()}] (${ratings.take(2).toList}...)"
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/Engine.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/Engine.scala b/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/Engine.scala
new file mode 100644
index 0000000..b2a668b
--- /dev/null
+++ b/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/Engine.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.recommendation
+
+import org.apache.predictionio.controller.EngineFactory
+import org.apache.predictionio.controller.Engine
+
+case class Query(
+ user: String,
+ num: Int
+) extends Serializable
+
+case class PredictedResult(
+ itemScores: Array[ItemScore]
+) extends Serializable
+
+case class ActualResult(
+ ratings: Array[Rating]
+) extends Serializable
+
+case class ItemScore(
+ item: String,
+ score: Double
+) extends Serializable
+
+object RecommendationEngine extends EngineFactory {
+ def apply() = {
+ new Engine(
+ classOf[DataSource],
+ classOf[Preparator],
+ Map("als" -> classOf[ALSAlgorithm]),
+ classOf[Serving])
+ }
+}