You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by ch...@apache.org on 2017/09/28 15:54:46 UTC
[05/57] [abbrv] incubator-predictionio git commit: [PIO-97] Fixes
examples of the official templates for v0.11.0-incubating.
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/Evaluation.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/Evaluation.scala b/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/Evaluation.scala
new file mode 100644
index 0000000..a665496
--- /dev/null
+++ b/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/Evaluation.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.recommendation
+
+import org.apache.predictionio.controller.Evaluation
+import org.apache.predictionio.controller.OptionAverageMetric
+import org.apache.predictionio.controller.AverageMetric
+import org.apache.predictionio.controller.EmptyEvaluationInfo
+import org.apache.predictionio.controller.EngineParamsGenerator
+import org.apache.predictionio.controller.EngineParams
+import org.apache.predictionio.controller.MetricEvaluator
+
+// Usage:
+// $ pio eval org.example.recommendation.RecommendationEvaluation \
+// org.example.recommendation.EngineParamsList
+
+case class PrecisionAtK(k: Int, ratingThreshold: Double = 2.0)
+ extends OptionAverageMetric[EmptyEvaluationInfo, Query, PredictedResult, ActualResult] {
+ require(k > 0, "k must be greater than 0")
+
+ override def header = s"Precision@K (k=$k, threshold=$ratingThreshold)"
+
+ def calculate(q: Query, p: PredictedResult, a: ActualResult): Option[Double] = {
+ val positives: Set[String] = a.ratings.filter(_.rating >= ratingThreshold).map(_.item).toSet
+
+ // If there is no positive results, Precision is undefined. We don't consider this case in the
+ // metrics, hence we return None.
+ if (positives.size == 0) {
+ None
+ } else {
+ val tpCount: Int = p.itemScores.take(k).filter(is => positives(is.item)).size
+ Some(tpCount.toDouble / math.min(k, positives.size))
+ }
+ }
+}
+
+case class PositiveCount(ratingThreshold: Double = 2.0)
+ extends AverageMetric[EmptyEvaluationInfo, Query, PredictedResult, ActualResult] {
+ override def header = s"PositiveCount (threshold=$ratingThreshold)"
+
+ def calculate(q: Query, p: PredictedResult, a: ActualResult): Double = {
+ a.ratings.filter(_.rating >= ratingThreshold).size
+ }
+}
+
+object RecommendationEvaluation extends Evaluation {
+ engineEvaluator = (
+ RecommendationEngine(),
+ MetricEvaluator(
+ metric = PrecisionAtK(k = 10, ratingThreshold = 4.0),
+ otherMetrics = Seq(
+ PositiveCount(ratingThreshold = 4.0),
+ PrecisionAtK(k = 10, ratingThreshold = 2.0),
+ PositiveCount(ratingThreshold = 2.0),
+ PrecisionAtK(k = 10, ratingThreshold = 1.0),
+ PositiveCount(ratingThreshold = 1.0)
+ )))
+}
+
+
+object ComprehensiveRecommendationEvaluation extends Evaluation {
+ val ratingThresholds = Seq(0.0, 2.0, 4.0)
+ val ks = Seq(1, 3, 10)
+
+ engineEvaluator = (
+ RecommendationEngine(),
+ MetricEvaluator(
+ metric = PrecisionAtK(k = 3, ratingThreshold = 2.0),
+ otherMetrics = (
+ (for (r <- ratingThresholds) yield PositiveCount(ratingThreshold = r)) ++
+ (for (r <- ratingThresholds; k <- ks) yield PrecisionAtK(k = k, ratingThreshold = r))
+ )))
+}
+
+
+trait BaseEngineParamsList extends EngineParamsGenerator {
+ protected val baseEP = EngineParams(
+ dataSourceParams = DataSourceParams(
+ appName = "MyApp1",
+ evalParams = Some(DataSourceEvalParams(kFold = 5, queryNum = 10))))
+}
+
+object EngineParamsList extends BaseEngineParamsList {
+ engineParamsList = for(
+ rank <- Seq(5, 10, 20);
+ numIterations <- Seq(1, 5, 10))
+ yield baseEP.copy(
+ algorithmParamsList = Seq(
+ ("als", ALSAlgorithmParams(rank, numIterations, 0.01, Some(3)))))
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/Preparator.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/Preparator.scala b/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/Preparator.scala
new file mode 100644
index 0000000..6a41c47
--- /dev/null
+++ b/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/Preparator.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.recommendation
+
+import org.apache.predictionio.controller.PPreparator
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+class Preparator
+ extends PPreparator[TrainingData, PreparedData] {
+
+ def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = {
+ new PreparedData(ratings = trainingData.ratings)
+ }
+}
+
+class PreparedData(
+ val ratings: RDD[Rating]
+) extends Serializable
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/Serving.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/Serving.scala b/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/Serving.scala
new file mode 100644
index 0000000..c478455
--- /dev/null
+++ b/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/Serving.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.recommendation
+
+import org.apache.predictionio.controller.LServing
+
+class Serving
+ extends LServing[Query, PredictedResult] {
+
+ override
+ def serve(query: Query,
+ predictedResults: Seq[PredictedResult]): PredictedResult = {
+ predictedResults.head
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/reading-custom-events/template.json
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/reading-custom-events/template.json b/examples/scala-parallel-recommendation/reading-custom-events/template.json
new file mode 100644
index 0000000..d076ec5
--- /dev/null
+++ b/examples/scala-parallel-recommendation/reading-custom-events/template.json
@@ -0,0 +1 @@
+{"pio": {"version": { "min": "0.10.0-incubating" }}}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/train-with-view-event/.gitignore
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/train-with-view-event/.gitignore b/examples/scala-parallel-recommendation/train-with-view-event/.gitignore
new file mode 100644
index 0000000..3f3403a
--- /dev/null
+++ b/examples/scala-parallel-recommendation/train-with-view-event/.gitignore
@@ -0,0 +1,5 @@
+data/sample_movielens_data.txt
+manifest.json
+target/
+pio.log
+/pio.sbt
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/train-with-view-event/build.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/train-with-view-event/build.sbt b/examples/scala-parallel-recommendation/train-with-view-event/build.sbt
new file mode 100644
index 0000000..3d25df1
--- /dev/null
+++ b/examples/scala-parallel-recommendation/train-with-view-event/build.sbt
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+name := "template-scala-parallel-recommendation"
+
+organization := "org.apache.predictionio"
+scalaVersion := "2.11.8"
+libraryDependencies ++= Seq(
+ "org.apache.predictionio" %% "apache-predictionio-core" % "0.11.0-incubating" % "provided",
+ "org.apache.spark" %% "spark-mllib" % "2.1.1" % "provided")
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/train-with-view-event/data/import_eventserver.py
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/train-with-view-event/data/import_eventserver.py b/examples/scala-parallel-recommendation/train-with-view-event/data/import_eventserver.py
new file mode 100644
index 0000000..f6add25
--- /dev/null
+++ b/examples/scala-parallel-recommendation/train-with-view-event/data/import_eventserver.py
@@ -0,0 +1,81 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Import sample data for recommendation engine
+"""
+
+import predictionio
+import argparse
+import random
+
+RATE_ACTIONS_DELIMITER = "::"
+SEED = 3
+
+def import_events(client, file):
+ f = open(file, 'r')
+ random.seed(SEED)
+ count = 0
+ print("Importing data...")
+ for line in f:
+ data = line.rstrip('\r\n').split(RATE_ACTIONS_DELIMITER)
+ client.create_event(
+ event="view",
+ entity_type="user",
+ entity_id=data[0],
+ target_entity_type="item",
+ target_entity_id=data[1]
+ )
+ count += 1
+ # For demonstration purpose, randomly mix in some buy events
+ if (random.randint(0, 1) == 1):
+ client.create_event(
+ event="rate",
+ entity_type="user",
+ entity_id=data[0],
+ target_entity_type="item",
+ target_entity_id=data[1],
+ properties= { "rating" : float(data[2]) }
+ )
+ else:
+ client.create_event(
+ event="buy",
+ entity_type="user",
+ entity_id=data[0],
+ target_entity_type="item",
+ target_entity_id=data[1]
+ )
+ count += 1
+ f.close()
+ print("%s events are imported." % count)
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser(
+ description="Import sample data for recommendation engine")
+ parser.add_argument('--access_key', default='invald_access_key')
+ parser.add_argument('--url', default="http://localhost:7070")
+ parser.add_argument('--file', default="./data/sample_movielens_data.txt")
+
+ args = parser.parse_args()
+ print(args)
+
+ client = predictionio.EventClient(
+ access_key=args.access_key,
+ url=args.url,
+ threads=5,
+ qsize=500)
+ import_events(client, args.file)
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/train-with-view-event/data/send_query.py
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/train-with-view-event/data/send_query.py b/examples/scala-parallel-recommendation/train-with-view-event/data/send_query.py
new file mode 100644
index 0000000..f6ec9ab
--- /dev/null
+++ b/examples/scala-parallel-recommendation/train-with-view-event/data/send_query.py
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Send sample query to prediction engine
+"""
+
+import predictionio
+engine_client = predictionio.EngineClient(url="http://localhost:8000")
+print(engine_client.send_query({"user": "1", "num": 4}))
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/train-with-view-event/engine.json
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/train-with-view-event/engine.json b/examples/scala-parallel-recommendation/train-with-view-event/engine.json
new file mode 100644
index 0000000..718b0e1
--- /dev/null
+++ b/examples/scala-parallel-recommendation/train-with-view-event/engine.json
@@ -0,0 +1,21 @@
+{
+ "id": "default",
+ "description": "Default settings",
+ "engineFactory": "org.apache.predictionio.examples.recommendation.RecommendationEngine",
+ "datasource": {
+ "params" : {
+ "appName": "MyApp1"
+ }
+ },
+ "algorithms": [
+ {
+ "name": "als",
+ "params": {
+ "rank": 10,
+ "numIterations": 20,
+ "lambda": 0.01,
+ "seed": 3
+ }
+ }
+ ]
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/train-with-view-event/project/assembly.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/train-with-view-event/project/assembly.sbt b/examples/scala-parallel-recommendation/train-with-view-event/project/assembly.sbt
new file mode 100644
index 0000000..92636bc
--- /dev/null
+++ b/examples/scala-parallel-recommendation/train-with-view-event/project/assembly.sbt
@@ -0,0 +1 @@
+addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.4")
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/train-with-view-event/project/build.properties
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/train-with-view-event/project/build.properties b/examples/scala-parallel-recommendation/train-with-view-event/project/build.properties
new file mode 100644
index 0000000..64317fd
--- /dev/null
+++ b/examples/scala-parallel-recommendation/train-with-view-event/project/build.properties
@@ -0,0 +1 @@
+sbt.version=0.13.15
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/ALSAlgorithm.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/ALSAlgorithm.scala
new file mode 100644
index 0000000..3aa1953
--- /dev/null
+++ b/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/ALSAlgorithm.scala
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.recommendation
+
+import org.apache.predictionio.controller.PAlgorithm
+import org.apache.predictionio.controller.Params
+import org.apache.predictionio.data.storage.BiMap
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.recommendation.ALS
+import org.apache.spark.mllib.recommendation.{Rating => MLlibRating}
+import org.apache.spark.mllib.recommendation.ALSModel
+
+import grizzled.slf4j.Logger
+
+case class ALSAlgorithmParams(
+ rank: Int,
+ numIterations: Int,
+ lambda: Double,
+ seed: Option[Long]) extends Params
+
+class ALSAlgorithm(val ap: ALSAlgorithmParams)
+ extends PAlgorithm[PreparedData, ALSModel, Query, PredictedResult] {
+
+ @transient lazy val logger = Logger[this.type]
+
+ if (ap.numIterations > 30) {
+ logger.warn(
+ s"ALSAlgorithmParams.numIterations > 30, current: ${ap.numIterations}. " +
+ s"There is a chance of running to StackOverflowException." +
+ s"To remedy it, set lower numIterations or checkpoint parameters.")
+ }
+
+ def train(sc: SparkContext, data: PreparedData): ALSModel = {
+ // MLLib ALS cannot handle empty training data.
+ require(!data.ratings.take(1).isEmpty,
+ s"RDD[Rating] in PreparedData cannot be empty." +
+ " Please check if DataSource generates TrainingData" +
+ " and Preparator generates PreparedData correctly.")
+ // Convert user and item String IDs to Int index for MLlib
+
+ val userStringIntMap = BiMap.stringInt(data.ratings.map(_.user))
+ val itemStringIntMap = BiMap.stringInt(data.ratings.map(_.item))
+ val mllibRatings = data.ratings.map( r =>
+ // MLlibRating requires integer index for user and item
+ MLlibRating(userStringIntMap(r.user), itemStringIntMap(r.item), r.rating)
+ )
+
+ // seed for MLlib ALS
+ val seed = ap.seed.getOrElse(System.nanoTime)
+
+ // Set checkpoint directory
+ // sc.setCheckpointDir("checkpoint")
+
+ // If you only have one type of implicit event (Eg. "view" event only),
+ // set implicitPrefs to true
+ // MODIFIED
+ val implicitPrefs = true
+ val als = new ALS()
+ als.setUserBlocks(-1)
+ als.setProductBlocks(-1)
+ als.setRank(ap.rank)
+ als.setIterations(ap.numIterations)
+ als.setLambda(ap.lambda)
+ als.setImplicitPrefs(implicitPrefs)
+ als.setAlpha(1.0)
+ als.setSeed(seed)
+ als.setCheckpointInterval(10)
+ val m = als.run(mllibRatings)
+
+ new ALSModel(
+ rank = m.rank,
+ userFeatures = m.userFeatures,
+ productFeatures = m.productFeatures,
+ userStringIntMap = userStringIntMap,
+ itemStringIntMap = itemStringIntMap)
+ }
+
+ def predict(model: ALSModel, query: Query): PredictedResult = {
+ // Convert String ID to Int index for Mllib
+ model.userStringIntMap.get(query.user).map { userInt =>
+ // create inverse view of itemStringIntMap
+ val itemIntStringMap = model.itemStringIntMap.inverse
+ // recommendProducts() returns Array[MLlibRating], which uses item Int
+ // index. Convert it to String ID for returning PredictedResult
+ val itemScores = model.recommendProducts(userInt, query.num)
+ .map (r => ItemScore(itemIntStringMap(r.product), r.rating))
+ new PredictedResult(itemScores)
+ }.getOrElse{
+ logger.info(s"No prediction for unknown user ${query.user}.")
+ new PredictedResult(Array.empty)
+ }
+ }
+
+ // This function is used by the evaluation module, where a batch of queries is sent to this engine
+ // for evaluation purpose.
+ override def batchPredict(model: ALSModel, queries: RDD[(Long, Query)]): RDD[(Long, PredictedResult)] = {
+ val userIxQueries: RDD[(Int, (Long, Query))] = queries
+ .map { case (ix, query) => {
+ // If user not found, then the index is -1
+ val userIx = model.userStringIntMap.get(query.user).getOrElse(-1)
+ (userIx, (ix, query))
+ }}
+
+ // Cross product of all valid users from the queries and products in the model.
+ val usersProducts: RDD[(Int, Int)] = userIxQueries
+ .keys
+ .filter(_ != -1)
+ .cartesian(model.productFeatures.map(_._1))
+
+ // Call mllib ALS's predict function.
+ val ratings: RDD[MLlibRating] = model.predict(usersProducts)
+
+ // The following code construct predicted results from mllib's ratings.
+ // Not optimal implementation. Instead of groupBy, should use combineByKey with a PriorityQueue
+ val userRatings: RDD[(Int, Iterable[MLlibRating])] = ratings.groupBy(_.user)
+
+ userIxQueries.leftOuterJoin(userRatings)
+ .map {
+ // When there are ratings
+ case (userIx, ((ix, query), Some(ratings))) => {
+ val topItemScores: Array[ItemScore] = ratings
+ .toArray
+ .sortBy(_.rating)(Ordering.Double.reverse) // note: from large to small ordering
+ .take(query.num)
+ .map { rating => ItemScore(
+ model.itemStringIntMap.inverse(rating.product),
+ rating.rating) }
+
+ (ix, PredictedResult(itemScores = topItemScores))
+ }
+ // When user doesn't exist in training data
+ case (userIx, ((ix, query), None)) => {
+ require(userIx == -1)
+ (ix, PredictedResult(itemScores = Array.empty))
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/ALSModel.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/ALSModel.scala b/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/ALSModel.scala
new file mode 100644
index 0000000..898858d
--- /dev/null
+++ b/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/ALSModel.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.recommendation
+// This must be the same package as Spark's MatrixFactorizationModel because
+// MatrixFactorizationModel's constructor is private and we are using
+// its constructor in order to save and load the model
+
+import org.apache.predictionio.examples.recommendation.ALSAlgorithmParams
+
+import org.apache.predictionio.controller.PersistentModel
+import org.apache.predictionio.controller.PersistentModelLoader
+import org.apache.predictionio.data.storage.BiMap
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+class ALSModel(
+ override val rank: Int,
+ override val userFeatures: RDD[(Int, Array[Double])],
+ override val productFeatures: RDD[(Int, Array[Double])],
+ val userStringIntMap: BiMap[String, Int],
+ val itemStringIntMap: BiMap[String, Int])
+ extends MatrixFactorizationModel(rank, userFeatures, productFeatures)
+ with PersistentModel[ALSAlgorithmParams] {
+
+ def save(id: String, params: ALSAlgorithmParams,
+ sc: SparkContext): Boolean = {
+
+ sc.parallelize(Seq(rank)).saveAsObjectFile(s"/tmp/${id}/rank")
+ userFeatures.saveAsObjectFile(s"/tmp/${id}/userFeatures")
+ productFeatures.saveAsObjectFile(s"/tmp/${id}/productFeatures")
+ sc.parallelize(Seq(userStringIntMap))
+ .saveAsObjectFile(s"/tmp/${id}/userStringIntMap")
+ sc.parallelize(Seq(itemStringIntMap))
+ .saveAsObjectFile(s"/tmp/${id}/itemStringIntMap")
+ true
+ }
+
+ override def toString = {
+ s"userFeatures: [${userFeatures.count()}]" +
+ s"(${userFeatures.take(2).toList}...)" +
+ s" productFeatures: [${productFeatures.count()}]" +
+ s"(${productFeatures.take(2).toList}...)" +
+ s" userStringIntMap: [${userStringIntMap.size}]" +
+ s"(${userStringIntMap.take(2)}...)" +
+ s" itemStringIntMap: [${itemStringIntMap.size}]" +
+ s"(${itemStringIntMap.take(2)}...)"
+ }
+}
+
+object ALSModel
+ extends PersistentModelLoader[ALSAlgorithmParams, ALSModel] {
+ def apply(id: String, params: ALSAlgorithmParams,
+ sc: Option[SparkContext]) = {
+ new ALSModel(
+ rank = sc.get.objectFile[Int](s"/tmp/${id}/rank").first,
+ userFeatures = sc.get.objectFile(s"/tmp/${id}/userFeatures"),
+ productFeatures = sc.get.objectFile(s"/tmp/${id}/productFeatures"),
+ userStringIntMap = sc.get
+ .objectFile[BiMap[String, Int]](s"/tmp/${id}/userStringIntMap").first,
+ itemStringIntMap = sc.get
+ .objectFile[BiMap[String, Int]](s"/tmp/${id}/itemStringIntMap").first)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/DataSource.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/DataSource.scala b/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/DataSource.scala
new file mode 100644
index 0000000..21796db
--- /dev/null
+++ b/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/DataSource.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.recommendation
+
+import org.apache.predictionio.controller.PDataSource
+import org.apache.predictionio.controller.EmptyEvaluationInfo
+import org.apache.predictionio.controller.EmptyActualResult
+import org.apache.predictionio.controller.Params
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.store.PEventStore
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+import grizzled.slf4j.Logger
+
+case class DataSourceEvalParams(kFold: Int, queryNum: Int)
+
+case class DataSourceParams(
+ appName: String,
+ evalParams: Option[DataSourceEvalParams]) extends Params
+
+class DataSource(val dsp: DataSourceParams)
+ extends PDataSource[TrainingData,
+ EmptyEvaluationInfo, Query, ActualResult] {
+
+ @transient lazy val logger = Logger[this.type]
+
+ def getRatings(sc: SparkContext): RDD[Rating] = {
+
+ val eventsRDD: RDD[Event] = PEventStore.find(
+ appName = dsp.appName,
+ entityType = Some("user"),
+ eventNames = Some(List("view")), // MODIFIED
+ // targetEntityType is optional field of an event.
+ targetEntityType = Some(Some("item")))(sc)
+
+ val ratingsRDD: RDD[Rating] = eventsRDD.map { event =>
+ try {
+ val ratingValue: Double = event.event match {
+ case "view" => 1.0 // MODIFIED
+ case _ => throw new Exception(s"Unexpected event ${event} is read.")
+ }
+ // MODIFIED
+ // key is (user id, item id)
+ // value is the rating value, which is 1.
+ ((event.entityId, event.targetEntityId.get), ratingValue)
+ } catch {
+ case e: Exception => {
+ logger.error(s"Cannot convert ${event} to Rating. Exception: ${e}.")
+ throw e
+ }
+ }
+ }
+ // MODIFIED
+ // sum all values for the same user id and item id key
+ .reduceByKey { case (a, b) => a + b }
+ .map { case ((uid, iid), r) =>
+ Rating(uid, iid, r)
+ }.cache()
+
+ ratingsRDD
+ }
+
+ override
+ def readTraining(sc: SparkContext): TrainingData = {
+ new TrainingData(getRatings(sc))
+ }
+
+ override
+ def readEval(sc: SparkContext)
+ : Seq[(TrainingData, EmptyEvaluationInfo, RDD[(Query, ActualResult)])] = {
+ require(!dsp.evalParams.isEmpty, "Must specify evalParams")
+ val evalParams = dsp.evalParams.get
+
+ val kFold = evalParams.kFold
+ val ratings: RDD[(Rating, Long)] = getRatings(sc).zipWithUniqueId
+ ratings.cache
+
+ (0 until kFold).map { idx => {
+ val trainingRatings = ratings.filter(_._2 % kFold != idx).map(_._1)
+ val testingRatings = ratings.filter(_._2 % kFold == idx).map(_._1)
+
+ val testingUsers: RDD[(String, Iterable[Rating])] = testingRatings.groupBy(_.user)
+
+ (new TrainingData(trainingRatings),
+ new EmptyEvaluationInfo(),
+ testingUsers.map {
+ case (user, ratings) => (Query(user, evalParams.queryNum), ActualResult(ratings.toArray))
+ }
+ )
+ }}
+ }
+}
+
+case class Rating(
+ user: String,
+ item: String,
+ rating: Double
+)
+
+class TrainingData(
+ val ratings: RDD[Rating]
+) extends Serializable {
+ override def toString = {
+ s"ratings: [${ratings.count()}] (${ratings.take(2).toList}...)"
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/Engine.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/Engine.scala b/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/Engine.scala
new file mode 100644
index 0000000..b2a668b
--- /dev/null
+++ b/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/Engine.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.recommendation
+
+import org.apache.predictionio.controller.EngineFactory
+import org.apache.predictionio.controller.Engine
+
+case class Query(
+ user: String,
+ num: Int
+) extends Serializable
+
+case class PredictedResult(
+ itemScores: Array[ItemScore]
+) extends Serializable
+
+case class ActualResult(
+ ratings: Array[Rating]
+) extends Serializable
+
+case class ItemScore(
+ item: String,
+ score: Double
+) extends Serializable
+
+object RecommendationEngine extends EngineFactory {
+ def apply() = {
+ new Engine(
+ classOf[DataSource],
+ classOf[Preparator],
+ Map("als" -> classOf[ALSAlgorithm]),
+ classOf[Serving])
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/Evaluation.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/Evaluation.scala b/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/Evaluation.scala
new file mode 100644
index 0000000..a665496
--- /dev/null
+++ b/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/Evaluation.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.recommendation
+
+import org.apache.predictionio.controller.Evaluation
+import org.apache.predictionio.controller.OptionAverageMetric
+import org.apache.predictionio.controller.AverageMetric
+import org.apache.predictionio.controller.EmptyEvaluationInfo
+import org.apache.predictionio.controller.EngineParamsGenerator
+import org.apache.predictionio.controller.EngineParams
+import org.apache.predictionio.controller.MetricEvaluator
+
+// Usage:
+// $ pio eval org.example.recommendation.RecommendationEvaluation \
+// org.example.recommendation.EngineParamsList
+
+case class PrecisionAtK(k: Int, ratingThreshold: Double = 2.0)
+ extends OptionAverageMetric[EmptyEvaluationInfo, Query, PredictedResult, ActualResult] {
+ require(k > 0, "k must be greater than 0")
+
+ override def header = s"Precision@K (k=$k, threshold=$ratingThreshold)"
+
+ def calculate(q: Query, p: PredictedResult, a: ActualResult): Option[Double] = {
+ val positives: Set[String] = a.ratings.filter(_.rating >= ratingThreshold).map(_.item).toSet
+
+ // If there is no positive results, Precision is undefined. We don't consider this case in the
+ // metrics, hence we return None.
+ if (positives.size == 0) {
+ None
+ } else {
+ val tpCount: Int = p.itemScores.take(k).filter(is => positives(is.item)).size
+ Some(tpCount.toDouble / math.min(k, positives.size))
+ }
+ }
+}
+
+case class PositiveCount(ratingThreshold: Double = 2.0)
+ extends AverageMetric[EmptyEvaluationInfo, Query, PredictedResult, ActualResult] {
+ override def header = s"PositiveCount (threshold=$ratingThreshold)"
+
+ def calculate(q: Query, p: PredictedResult, a: ActualResult): Double = {
+ a.ratings.filter(_.rating >= ratingThreshold).size
+ }
+}
+
+object RecommendationEvaluation extends Evaluation {
+ engineEvaluator = (
+ RecommendationEngine(),
+ MetricEvaluator(
+ metric = PrecisionAtK(k = 10, ratingThreshold = 4.0),
+ otherMetrics = Seq(
+ PositiveCount(ratingThreshold = 4.0),
+ PrecisionAtK(k = 10, ratingThreshold = 2.0),
+ PositiveCount(ratingThreshold = 2.0),
+ PrecisionAtK(k = 10, ratingThreshold = 1.0),
+ PositiveCount(ratingThreshold = 1.0)
+ )))
+}
+
+
+object ComprehensiveRecommendationEvaluation extends Evaluation {
+ val ratingThresholds = Seq(0.0, 2.0, 4.0)
+ val ks = Seq(1, 3, 10)
+
+ engineEvaluator = (
+ RecommendationEngine(),
+ MetricEvaluator(
+ metric = PrecisionAtK(k = 3, ratingThreshold = 2.0),
+ otherMetrics = (
+ (for (r <- ratingThresholds) yield PositiveCount(ratingThreshold = r)) ++
+ (for (r <- ratingThresholds; k <- ks) yield PrecisionAtK(k = k, ratingThreshold = r))
+ )))
+}
+
+
+trait BaseEngineParamsList extends EngineParamsGenerator {
+ protected val baseEP = EngineParams(
+ dataSourceParams = DataSourceParams(
+ appName = "MyApp1",
+ evalParams = Some(DataSourceEvalParams(kFold = 5, queryNum = 10))))
+}
+
+object EngineParamsList extends BaseEngineParamsList {
+ engineParamsList = for(
+ rank <- Seq(5, 10, 20);
+ numIterations <- Seq(1, 5, 10))
+ yield baseEP.copy(
+ algorithmParamsList = Seq(
+ ("als", ALSAlgorithmParams(rank, numIterations, 0.01, Some(3)))))
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/Preparator.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/Preparator.scala b/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/Preparator.scala
new file mode 100644
index 0000000..6a41c47
--- /dev/null
+++ b/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/Preparator.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.recommendation
+
+import org.apache.predictionio.controller.PPreparator
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+class Preparator
+ extends PPreparator[TrainingData, PreparedData] {
+
+ def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = {
+ new PreparedData(ratings = trainingData.ratings)
+ }
+}
+
+class PreparedData(
+ val ratings: RDD[Rating]
+) extends Serializable
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/Serving.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/Serving.scala b/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/Serving.scala
new file mode 100644
index 0000000..c478455
--- /dev/null
+++ b/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/Serving.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.recommendation
+
+import org.apache.predictionio.controller.LServing
+
+class Serving
+ extends LServing[Query, PredictedResult] {
+
+ override
+ def serve(query: Query,
+ predictedResults: Seq[PredictedResult]): PredictedResult = {
+ predictedResults.head
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/train-with-view-event/template.json
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-recommendation/train-with-view-event/template.json b/examples/scala-parallel-recommendation/train-with-view-event/template.json
new file mode 100644
index 0000000..d076ec5
--- /dev/null
+++ b/examples/scala-parallel-recommendation/train-with-view-event/template.json
@@ -0,0 +1 @@
+{"pio": {"version": { "min": "0.10.0-incubating" }}}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/README.md
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/README.md b/examples/scala-parallel-similarproduct/README.md
new file mode 100644
index 0000000..0a56220
--- /dev/null
+++ b/examples/scala-parallel-similarproduct/README.md
@@ -0,0 +1,20 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+This is based on Similar Product Template v0.11.0-incubating.
+
+Please refer to http://predictionio.incubator.apache.org/templates/similarproduct/how-to/
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-and-return-item-properties/.gitignore
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/add-and-return-item-properties/.gitignore b/examples/scala-parallel-similarproduct/add-and-return-item-properties/.gitignore
deleted file mode 100644
index 57841c6..0000000
--- a/examples/scala-parallel-similarproduct/add-and-return-item-properties/.gitignore
+++ /dev/null
@@ -1,4 +0,0 @@
-manifest.json
-target/
-pio.log
-/pio.sbt
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-and-return-item-properties/README.md
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/add-and-return-item-properties/README.md b/examples/scala-parallel-similarproduct/add-and-return-item-properties/README.md
deleted file mode 100644
index e877e1c..0000000
--- a/examples/scala-parallel-similarproduct/add-and-return-item-properties/README.md
+++ /dev/null
@@ -1,205 +0,0 @@
-<!--
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements. See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
--->
-
----
-#PredictionIO: Add Your Own Properties to Returned Items
----
-
-This small how-to explains how to add user defined properties to items returned by PredictionIO engine.
-This how-to is based on the [Similar Product Engine Template](http://predictionio.incubator.apache.org/templates/similarproduct/quickstart/) version v0.1.3
-To use this how-to you need to be familiar with scala programming language.
-In this how-to we also suppose you was able to set up and run `Similar Product Engine` (see their [quick start guide](http://predictionio.incubator.apache.org/templates/similarproduct/quickstart/)).
-
-A full end-to-end example can be found on
-[GitHub](https://github.com/apache/incubator-predictionio/tree/develop/examples/scala-parallel-similarproduct/add-and-return-item-properties).
-
-## THE TASK
-
-Suppose you would like to use [Similar Product Engine](http://predictionio.incubator.apache.org/templates/similarproduct/quickstart/)
-for suggesting your users the videos they can also like. The `Similar Product Engine` will answer to you
-with list of IDs for such videos. So, for example `REST` response from the engine right now
-looks like the one below
-```json
-{"itemScores":[
- {
- "item":"i12",
- "score":1.1700499715209998
- },{
- "item":"i44",
- "score":1.1153550716504106
- }
-]}
-```
-
-But you want the engine to return more information about every video. Let's think you want add fields
-`title`, `date`, and `imdbUrl` to every item, so, the resulting `REST` respose
-for your case should look similar to the posted below
-```json
-{"itemScores":[
- {
- "item":"i12",
- "title":"title for movie i12",
- "date":"1935",
- "imdbUrl":"http://imdb.com/fake-url/i12",
- "score":1.1700499715209998
- },{
- "item":"i44",
- "title":"title for movie i44",
- "date":"1974",
- "imdbUrl":"http://imdb.com/fake-url/i44",
- "score":1.1153550716504106
- }
-]}
-```
-
-## SO, HOW TO?
-
-### The Main Idea
-
-Recall [the DASE Architecture](http://predictionio.incubator.apache.org/templates/similarproduct/dase/), a PredictionIO engine has
-4 main components: `Data Source`, `Data Preparator`, `Algorithm`, and `Serving`
-components. To achieve your goal, you will need provide the information about video to engine
-(using sdk), and then let this information to pass from `Data Source` through all the engine
-to the `Serving` component where the engine will send required information back to your application.
-
-### Implementation
-
-#### Modify The Item
-In file [DataSource.scala#L104](https://github.com/apache/incubator-predictionio/blob/develop/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/DataSource.scala#L104)
-you will find class `Item` defined in the next way
-```scala
-case class Item(categories: Option[List[String]])
-```
-
-At the first, we need simply add required fields to this class
-```scala
-case class Item(
- title: String,
- date: String,
- imdbUrl: String,
- categories: Option[List[String]])
-```
-
-#### Create The Item Properly
-Now, your IDE (or compiler) will say you about all the places where you need make changes to create item
-properly. For example, [DataSource.scala#L52](https://github.com/apache/incubator-predictionio/blob/develop/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/DataSource.scala#L52)
-```scala
-Item(categories = properties.getOpt[List[String]]("categories"))
-```
-You need now to add needed properties to item
-```scala
-Item(
- title = properties.get[String]("title"),
- date = properties.get[String]("date"),
- imdbUrl = properties.get[String]("imdbUrl"),
- categories = properties.getOpt[List[String]]("categories"))
-```
-
-#### Modify The ItemScore
-Now, when you've fixed item creation, take a look on class `ItemScore` from the file [Engine.scala](https://github.com/apache/incubator-predictionio/blob/develop/examples/scala-parallel-similarproduct/multi/src/main/scala/Engine.scala)
-```scala
-case class ItemScore(
- item: String,
- score: Double
-) extends Serializable
-```
-Engine will return class `PredictedResult` which contains property `itemScores: Array[ItemScore]`.
-So, since your result items are of class`ItemScore`, you need modify this class too.
-In our example after modification you will have something similar to below
-```scala
-case class ItemScore(
- item: String,
- title: String,
- date: String,
- imdbUrl: String,
- score: Double
-) extends Serializable
-```
-
-#### Create The ItemScore Properly
-
-Again, now you need to go through all the places where `ItemScore` is created and fix compiler errors.
-
-Result is initially created by the `Algorithm` component and then is passed to the `Serving` component.
-Take a look on a place where object of class ItemScore is initially created in file [ALSAlgorithm.scala#L171](https://github.com/apache/incubator-predictionio/blob/develop/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/ALSAlgorithm.scala#L171).
-```scala
-new ItemScore(
- item = model.itemIntStringMap(i),
- score = s
-)
-```
-You code after changes will be similar to posted below
-```scala
-val it = model.items(i)
-new ItemScore(
- item = model.itemIntStringMap(i),
- title = it.title,
- date = it.date,
- imdbUrl = it.imdbUrl,
- score = s
-)
-```
-Using `model.items(i)` you can receive corresponding object of the `Item` class,
-and now you can access its properties which you created during previous step.
-Using `model.itemIntStringMap(i)` you can receive ID of corresponding item.
-
-#### Modify Script That Supplies Data For The Engine
-And this is the final step. You should supply your data to the engine using new format now.
-To get the idea take a look on this piece of code in our [sample python script](https://github.com/apache/incubator-predictionio/blob/develop/examples/scala-parallel-similarproduct/add-and-return-item-properties/data/import_eventserver.py#L34)
-that creates test.
-
-Creating item before modification.
-```python
-client.create_event(
- event="$set",
- entity_type="item",
- entity_id=item_id,
- properties={
- "categories" : random.sample(categories, random.randint(1, 4))
- }
-)
-```
-Creating item after modification.
-```python
-client.create_event(
- event="$set",
- entity_type="item",
- entity_id=item_id,
- properties={
- "categories" : random.sample(categories, random.randint(1, 4)),
- "title": "title for movie " + item_id,
- "date": 1935 + random.randint(1, 25),
- "imdbUrl": "http://imdb.com/fake-url/" + item_id
- }
-)
-```
-
-#### Try It!
-When you are ready, don't forget to fill application with new data and then
-```bash
-$ pio build
-$ pio train
-$ pio deploy
-```
-
-Now, you should be able to see desired results by querying engine
-```bash
-curl -H "Content-Type: application/json" -d '{ "items": ["i1", "i3"], "num": 10}' http://localhost:8000/queries.json
-```
-
-A full end-to-end example can be found on
-[GitHub](https://github.com/apache/incubator-predictionio/tree/develop/examples/scala-parallel-similarproduct/add-and-return-item-properties)
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-and-return-item-properties/build.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/add-and-return-item-properties/build.sbt b/examples/scala-parallel-similarproduct/add-and-return-item-properties/build.sbt
deleted file mode 100644
index ef66b2f..0000000
--- a/examples/scala-parallel-similarproduct/add-and-return-item-properties/build.sbt
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import AssemblyKeys._
-
-assemblySettings
-
-name := "template-scala-parallel-similarproduct"
-
-organization := "org.apache.predictionio"
-
-libraryDependencies ++= Seq(
- "org.apache.predictionio" %% "core" % pioVersion.value % "provided",
- "org.apache.spark" %% "spark-core" % "1.2.0" % "provided",
- "org.apache.spark" %% "spark-mllib" % "1.2.0" % "provided")
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-and-return-item-properties/data/import_eventserver.py
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/add-and-return-item-properties/data/import_eventserver.py b/examples/scala-parallel-similarproduct/add-and-return-item-properties/data/import_eventserver.py
deleted file mode 100644
index 15aa38c..0000000
--- a/examples/scala-parallel-similarproduct/add-and-return-item-properties/data/import_eventserver.py
+++ /dev/null
@@ -1,93 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""
-Import sample data for similar product engine
-"""
-
-import predictionio
-import argparse
-import random
-
-SEED = 3
-
-def import_events(client):
- random.seed(SEED)
- count = 0
- print client.get_status()
- print "Importing data..."
-
- # generate 10 users, with user ids u1,u2,....,u10
- user_ids = ["u%s" % i for i in range(1, 11)]
- for user_id in user_ids:
- print "Set user", user_id
- client.create_event(
- event="$set",
- entity_type="user",
- entity_id=user_id
- )
- count += 1
-
- # generate 50 items, with item ids i1,i2,....,i50
- # random assign 1 to 4 categories among c1-c6 to items
- categories = ["c%s" % i for i in range(1, 7)]
- item_ids = ["i%s" % i for i in range(1, 51)]
- for item_id in item_ids:
- print "Set item", item_id
- client.create_event(
- event="$set",
- entity_type="item",
- entity_id=item_id,
- properties={
- "categories" : random.sample(categories, random.randint(1, 4)),
- "title": "title for movie " + item_id,
- "date": 1935 + random.randint(1, 25),
- "imdbUrl": "http://imdb.com/fake-url/" + item_id
- }
- )
- count += 1
-
- # each user randomly viewed 10 items
- for user_id in user_ids:
- for viewed_item in random.sample(item_ids, 10):
- print "User", user_id ,"views item", viewed_item
- client.create_event(
- event="view",
- entity_type="user",
- entity_id=user_id,
- target_entity_type="item",
- target_entity_id=viewed_item
- )
- count += 1
-
- print "%s events are imported." % count
-
-if __name__ == '__main__':
- parser = argparse.ArgumentParser(
- description="Import sample data for similar product engine")
- parser.add_argument('--access_key', default='invald_access_key')
- parser.add_argument('--url', default="http://localhost:7070")
-
- args = parser.parse_args()
- print args
-
- client = predictionio.EventClient(
- access_key=args.access_key,
- url=args.url,
- threads=5,
- qsize=500)
- import_events(client)
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-and-return-item-properties/data/send_query.py
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/add-and-return-item-properties/data/send_query.py b/examples/scala-parallel-similarproduct/add-and-return-item-properties/data/send_query.py
deleted file mode 100644
index 8678b15..0000000
--- a/examples/scala-parallel-similarproduct/add-and-return-item-properties/data/send_query.py
+++ /dev/null
@@ -1,24 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""
-Send sample query to prediction engine
-"""
-
-import predictionio
-engine_client = predictionio.EngineClient(url="http://localhost:8000")
-print engine_client.send_query({"items": ["i1", "i3"], "num": 4})
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-and-return-item-properties/engine.json
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/add-and-return-item-properties/engine.json b/examples/scala-parallel-similarproduct/add-and-return-item-properties/engine.json
deleted file mode 100644
index 9aa6dfa..0000000
--- a/examples/scala-parallel-similarproduct/add-and-return-item-properties/engine.json
+++ /dev/null
@@ -1,21 +0,0 @@
-{
- "id": "default",
- "description": "Default settings",
- "engineFactory": "org.template.similarproduct.SimilarProductEngine",
- "datasource": {
- "params" : {
- "appId": 2
- }
- },
- "algorithms": [
- {
- "name": "als",
- "params": {
- "rank": 10,
- "numIterations" : 20,
- "lambda": 0.01,
- "seed": 3
- }
- }
- ]
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-and-return-item-properties/project/assembly.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/add-and-return-item-properties/project/assembly.sbt b/examples/scala-parallel-similarproduct/add-and-return-item-properties/project/assembly.sbt
deleted file mode 100644
index 54c3252..0000000
--- a/examples/scala-parallel-similarproduct/add-and-return-item-properties/project/assembly.sbt
+++ /dev/null
@@ -1 +0,0 @@
-addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-and-return-item-properties/project/pio-build.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/add-and-return-item-properties/project/pio-build.sbt b/examples/scala-parallel-similarproduct/add-and-return-item-properties/project/pio-build.sbt
deleted file mode 100644
index 9aed0ee..0000000
--- a/examples/scala-parallel-similarproduct/add-and-return-item-properties/project/pio-build.sbt
+++ /dev/null
@@ -1 +0,0 @@
-addSbtPlugin("org.apache.predictionio" % "pio-build" % "0.9.0")
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/ALSAlgorithm.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/ALSAlgorithm.scala
deleted file mode 100644
index 3f0b628..0000000
--- a/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/ALSAlgorithm.scala
+++ /dev/null
@@ -1,259 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.similarproduct
-
-import org.apache.predictionio.controller.P2LAlgorithm
-import org.apache.predictionio.controller.Params
-import org.apache.predictionio.data.storage.BiMap
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.mllib.recommendation.ALS
-import org.apache.spark.mllib.recommendation.{Rating => MLlibRating}
-
-import grizzled.slf4j.Logger
-
-import scala.collection.mutable.PriorityQueue
-
-case class ALSAlgorithmParams(
- rank: Int,
- numIterations: Int,
- lambda: Double,
- seed: Option[Long]) extends Params
-
-class ALSModel(
- val productFeatures: Map[Int, Array[Double]],
- val itemStringIntMap: BiMap[String, Int],
- val items: Map[Int, Item]
-) extends Serializable {
-
- @transient lazy val itemIntStringMap = itemStringIntMap.inverse
-
- override def toString = {
- s" productFeatures: [${productFeatures.size}]" +
- s"(${productFeatures.take(2).toList}...)" +
- s" itemStringIntMap: [${itemStringIntMap.size}]" +
- s"(${itemStringIntMap.take(2).toString}...)]" +
- s" items: [${items.size}]" +
- s"(${items.take(2).toString}...)]"
- }
-}
-
-/**
- * Use ALS to build item x feature matrix
- */
-class ALSAlgorithm(val ap: ALSAlgorithmParams)
- extends P2LAlgorithm[PreparedData, ALSModel, Query, PredictedResult] {
-
- @transient lazy val logger = Logger[this.type]
-
- def train(sc: SparkContext, data: PreparedData): ALSModel = {
- require(!data.viewEvents.take(1).isEmpty,
- s"viewEvents in PreparedData cannot be empty." +
- " Please check if DataSource generates TrainingData" +
- " and Preprator generates PreparedData correctly.")
- require(!data.users.take(1).isEmpty,
- s"users in PreparedData cannot be empty." +
- " Please check if DataSource generates TrainingData" +
- " and Preprator generates PreparedData correctly.")
- require(!data.items.take(1).isEmpty,
- s"items in PreparedData cannot be empty." +
- " Please check if DataSource generates TrainingData" +
- " and Preprator generates PreparedData correctly.")
- // create User and item's String ID to integer index BiMap
- val userStringIntMap = BiMap.stringInt(data.users.keys)
- val itemStringIntMap = BiMap.stringInt(data.items.keys)
-
- // collect Item as Map and convert ID to Int index
- val items: Map[Int, Item] = data.items.map { case (id, item) =>
- (itemStringIntMap(id), item)
- }.collectAsMap.toMap
-
- val mllibRatings = data.viewEvents
- .map { r =>
- // Convert user and item String IDs to Int index for MLlib
- val uindex = userStringIntMap.getOrElse(r.user, -1)
- val iindex = itemStringIntMap.getOrElse(r.item, -1)
-
- if (uindex == -1)
- logger.info(s"Couldn't convert nonexistent user ID ${r.user}"
- + " to Int index.")
-
- if (iindex == -1)
- logger.info(s"Couldn't convert nonexistent item ID ${r.item}"
- + " to Int index.")
-
- ((uindex, iindex), 1)
- }.filter { case ((u, i), v) =>
- // keep events with valid user and item index
- (u != -1) && (i != -1)
- }.reduceByKey(_ + _) // aggregate all view events of same user-item pair
- .map { case ((u, i), v) =>
- // MLlibRating requires integer index for user and item
- MLlibRating(u, i, v)
- }
- .cache()
-
- // MLLib ALS cannot handle empty training data.
- require(!mllibRatings.take(1).isEmpty,
- s"mllibRatings cannot be empty." +
- " Please check if your events contain valid user and item ID.")
-
- // seed for MLlib ALS
- val seed = ap.seed.getOrElse(System.nanoTime)
-
- val m = ALS.trainImplicit(
- ratings = mllibRatings,
- rank = ap.rank,
- iterations = ap.numIterations,
- lambda = ap.lambda,
- blocks = -1,
- alpha = 1.0,
- seed = seed)
-
- new ALSModel(
- productFeatures = m.productFeatures.collectAsMap.toMap,
- itemStringIntMap = itemStringIntMap,
- items = items
- )
- }
-
- def predict(model: ALSModel, query: Query): PredictedResult = {
-
- val productFeatures = model.productFeatures
-
- // convert items to Int index
- val queryList: Set[Int] = query.items.map(model.itemStringIntMap.get(_))
- .flatten.toSet
-
- val queryFeatures: Vector[Array[Double]] = queryList.toVector
- // productFeatures may not contain the requested item
- .map { item => productFeatures.get(item) }
- .flatten
-
- val whiteList: Option[Set[Int]] = query.whiteList.map( set =>
- set.map(model.itemStringIntMap.get(_)).flatten
- )
- val blackList: Option[Set[Int]] = query.blackList.map ( set =>
- set.map(model.itemStringIntMap.get(_)).flatten
- )
-
- val ord = Ordering.by[(Int, Double), Double](_._2).reverse
-
- val indexScores: Array[(Int, Double)] = if (queryFeatures.isEmpty) {
- logger.info(s"No productFeatures vector for query items ${query.items}.")
- Array[(Int, Double)]()
- } else {
- productFeatures.par // convert to parallel collection
- .mapValues { f =>
- queryFeatures.map{ qf =>
- cosine(qf, f)
- }.reduce(_ + _)
- }
- .filter(_._2 > 0) // keep items with score > 0
- .seq // convert back to sequential collection
- .toArray
- }
-
- val filteredScore = indexScores.view.filter { case (i, v) =>
- isCandidateItem(
- i = i,
- items = model.items,
- categories = query.categories,
- queryList = queryList,
- whiteList = whiteList,
- blackList = blackList
- )
- }
-
- val topScores = getTopN(filteredScore, query.num)(ord).toArray
-
- val itemScores = topScores.map { case (i, s) =>
- val it = model.items(i)
- new ItemScore(
- item = model.itemIntStringMap(i),
- title = it.title,
- date = it.date,
- imdbUrl = it.imdbUrl,
- score = s
- )
- }
-
- new PredictedResult(itemScores)
- }
-
- private
- def getTopN[T](s: Seq[T], n: Int)(implicit ord: Ordering[T]): Seq[T] = {
-
- val q = PriorityQueue()
-
- for (x <- s) {
- if (q.size < n)
- q.enqueue(x)
- else {
- // q is full
- if (ord.compare(x, q.head) < 0) {
- q.dequeue()
- q.enqueue(x)
- }
- }
- }
-
- q.dequeueAll.toSeq.reverse
- }
-
- private
- def cosine(v1: Array[Double], v2: Array[Double]): Double = {
- val size = v1.size
- var i = 0
- var n1: Double = 0
- var n2: Double = 0
- var d: Double = 0
- while (i < size) {
- n1 += v1(i) * v1(i)
- n2 += v2(i) * v2(i)
- d += v1(i) * v2(i)
- i += 1
- }
- val n1n2 = (math.sqrt(n1) * math.sqrt(n2))
- if (n1n2 == 0) 0 else (d / n1n2)
- }
-
- private
- def isCandidateItem(
- i: Int,
- items: Map[Int, Item],
- categories: Option[Set[String]],
- queryList: Set[Int],
- whiteList: Option[Set[Int]],
- blackList: Option[Set[Int]]
- ): Boolean = {
- whiteList.map(_.contains(i)).getOrElse(true) &&
- blackList.map(!_.contains(i)).getOrElse(true) &&
- // discard items in query as well
- (!queryList.contains(i)) &&
- // filter categories
- categories.map { cat =>
- items(i).categories.map { itemCat =>
- // keep this item if has ovelap categories with the query
- !(itemCat.toSet.intersect(cat).isEmpty)
- }.getOrElse(false) // discard this item if it has no categories
- }.getOrElse(true)
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/DataSource.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/DataSource.scala b/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/DataSource.scala
deleted file mode 100644
index 1c60e8a..0000000
--- a/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/DataSource.scala
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.similarproduct
-
-import org.apache.predictionio.controller.PDataSource
-import org.apache.predictionio.controller.EmptyEvaluationInfo
-import org.apache.predictionio.controller.EmptyActualResult
-import org.apache.predictionio.controller.Params
-import org.apache.predictionio.data.storage.Event
-import org.apache.predictionio.data.storage.Storage
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-import grizzled.slf4j.Logger
-
-case class DataSourceParams(appId: Int) extends Params
-
-class DataSource(val dsp: DataSourceParams)
- extends PDataSource[TrainingData,
- EmptyEvaluationInfo, Query, EmptyActualResult] {
-
- @transient lazy val logger = Logger[this.type]
-
- override
- def readTraining(sc: SparkContext): TrainingData = {
- val eventsDb = Storage.getPEvents()
-
- // create a RDD of (entityID, User)
- val usersRDD: RDD[(String, User)] = eventsDb.aggregateProperties(
- appId = dsp.appId,
- entityType = "user"
- )(sc).map { case (entityId, properties) =>
- val user = try {
- User()
- } catch {
- case e: Exception => {
- logger.error(s"Failed to get properties ${properties} of" +
- s" user ${entityId}. Exception: ${e}.")
- throw e
- }
- }
- (entityId, user)
- }.cache()
-
- // create a RDD of (entityID, Item)
- val itemsRDD: RDD[(String, Item)] = eventsDb.aggregateProperties(
- appId = dsp.appId,
- entityType = "item"
- )(sc).map { case (entityId, properties) =>
- val item = try {
- // Assume categories is optional property of item.
- Item(
- title = properties.get[String]("title"),
- date = properties.get[String]("date"),
- imdbUrl = properties.get[String]("imdbUrl"),
- categories = properties.getOpt[List[String]]("categories"))
- } catch {
- case e: Exception => {
- logger.error(s"Failed to get properties ${properties} of" +
- s" item ${entityId}. Exception: ${e}.")
- throw e
- }
- }
- (entityId, item)
- }.cache()
-
- // get all "user" "view" "item" events
- val viewEventsRDD: RDD[ViewEvent] = eventsDb.find(
- appId = dsp.appId,
- entityType = Some("user"),
- eventNames = Some(List("view")),
- // targetEntityType is optional field of an event.
- targetEntityType = Some(Some("item")))(sc)
- // eventsDb.find() returns RDD[Event]
- .map { event =>
- val viewEvent = try {
- event.event match {
- case "view" => ViewEvent(
- user = event.entityId,
- item = event.targetEntityId.get,
- t = event.eventTime.getMillis)
- case _ => throw new Exception(s"Unexpected event ${event} is read.")
- }
- } catch {
- case e: Exception => {
- logger.error(s"Cannot convert ${event} to ViewEvent." +
- s" Exception: ${e}.")
- throw e
- }
- }
- viewEvent
- }.cache()
-
- new TrainingData(
- users = usersRDD,
- items = itemsRDD,
- viewEvents = viewEventsRDD
- )
- }
-}
-
-case class User()
-
-case class Item(
- title: String,
- date: String,
- imdbUrl: String,
- categories: Option[List[String]])
-
-case class ViewEvent(user: String, item: String, t: Long)
-
-class TrainingData(
- val users: RDD[(String, User)],
- val items: RDD[(String, Item)],
- val viewEvents: RDD[ViewEvent]
-) {
- override def toString = {
- s"users: [${users.count()} (${users.take(2).toList}...)]" +
- s"items: [${items.count()} (${items.take(2).toList}...)]" +
- s"viewEvents: [${viewEvents.count()}] (${viewEvents.take(2).toList}...)"
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/Engine.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/Engine.scala b/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/Engine.scala
deleted file mode 100644
index 1133a61..0000000
--- a/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/Engine.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.similarproduct
-
-import org.apache.predictionio.controller.IEngineFactory
-import org.apache.predictionio.controller.Engine
-
-case class Query(
- items: List[String],
- num: Int,
- categories: Option[Set[String]],
- whiteList: Option[Set[String]],
- blackList: Option[Set[String]]
-)
-
-case class PredictedResult(
- itemScores: Array[ItemScore]
-)
-
-case class ItemScore(
- item: String,
- title: String,
- date: String,
- imdbUrl: String,
- score: Double
-)
-
-object SimilarProductEngine extends IEngineFactory {
- def apply() = {
- new Engine(
- classOf[DataSource],
- classOf[Preparator],
- Map("als" -> classOf[ALSAlgorithm]),
- classOf[Serving])
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/Preparator.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/Preparator.scala b/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/Preparator.scala
deleted file mode 100644
index d747a84..0000000
--- a/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/Preparator.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.similarproduct
-
-import org.apache.predictionio.controller.PPreparator
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-class Preparator
- extends PPreparator[TrainingData, PreparedData] {
-
- def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = {
- new PreparedData(
- users = trainingData.users,
- items = trainingData.items,
- viewEvents = trainingData.viewEvents)
- }
-}
-
-class PreparedData(
- val users: RDD[(String, User)],
- val items: RDD[(String, Item)],
- val viewEvents: RDD[ViewEvent]
-) extends Serializable
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/Serving.scala
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/Serving.scala b/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/Serving.scala
deleted file mode 100644
index 58c3b4e..0000000
--- a/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/Serving.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.similarproduct
-
-import org.apache.predictionio.controller.LServing
-
-class Serving
- extends LServing[Query, PredictedResult] {
-
- override
- def serve(query: Query,
- predictedResults: Seq[PredictedResult]): PredictedResult = {
- predictedResults.head
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-rateevent/README.md
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/add-rateevent/README.md b/examples/scala-parallel-similarproduct/add-rateevent/README.md
deleted file mode 100644
index 6c5c471..0000000
--- a/examples/scala-parallel-similarproduct/add-rateevent/README.md
+++ /dev/null
@@ -1,156 +0,0 @@
-<!--
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements. See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
--->
-
-# Similar Product Template Modified to Add Explicit Rate Event
-
-This example engine is based on Similar Product Tempplate version v0.1.2 and is modified to add Explicit Rate Event to training data.
-
-For example, An User would rate an item with a score or rating.The rating is used to train the model.
-
-## Documentation
-
-Please refer to http://predictionio.incubator.apache.org/templates/similarproduct/quickstart/
-
-
-
-## Development Notes
-
-### Changes to DataSource.scala
-
-1) class "Rating" is created.
-```
-case class Rating(
- user: String,
- item: String,
- rating: Double,
- t:Long
-)
-```
-
-2) "rateEventsRDD" is initialized by filtering events of type "rate" from Events database as shown below.
-
-```
- val rateEventsRDD: RDD[RateEvent] = eventsDb.find(
- appId = dsp.appId,
- entityType = Some("user"),
- eventNames = Some(List("rate")...
-
- val rateEvent = try {
- event.event match {
- case "rate" => RateEvent(
- user = event.entityId,
- item = event.targetEntityId.get,
- rating = event.properties.get[Double]("rating")...
-```
-
-### Changes to Preparator.scala
-
-1) val "rateEvents" is added to class "PreparedData"
-
-```
- class PreparedData(
- val users: RDD[(String, User)],
- val items: RDD[(String, Item)],
- val rateEvents: RDD[RateEvent]
-```
-
-2) val "rateEvents" is initialized in the Object of class "PreparedData".
-
-```
- new PreparedData(
- users = trainingData.users,
- items = trainingData.items,
- rateEvents = trainingData.rateEvents)
-```
-
-### Changes to ALSAlgorithm.scala
-
-1) Changed the Signature of "train" method to include SparkContext and edited method definition to specify the debug message.
-
-```
- def train(sc:SparkContext ,data: PreparedData): ALSModel = {
- require(!data.rateEvents.take(1).isEmpty,
- s"rateEvents in PreparedData cannot be empty." +
-```
-
-2) MlibRatings are initialized from rateEvents.
-
-```
- val mllibRatings = data.rateEvents
-```
-
-3) Invoke "ALS.train" method to train explicit rate events.
-```
- val m = ALS.train(
- ratings = mllibRatings,
- rank = ap.rank,
- iterations = ap.numIterations,
- lambda = ap.lambda,
- blocks = -1,
- seed = seed)
-```
-
-4) Define "rateEvent" RDD to filter rate events from events.
-
-```
- val rateEventsRDD: RDD[RateEvent] = eventsDb.find(...)
-```
-
-5) if a user may rate same item with different value at different times,use the latest value for this case.
-
-```
- .reduceByKey { case (v1, v2) => // MODIFIED
- // if a user may rate same item with different value at different times,
- // use the latest value for this case.
- // Can remove this reduceByKey() if no need to support this case.
- val (rating1, t1) = v1
- val (rating2, t2) = v2
- // keep the latest value
- if (t1 > t2) v1 else v2
- }
-```
-
-6) persist mlibRating.
-```
- .map { case ((u, i), (rating, t)) => // MODIFIED
- // MLlibRating requires integer index for user and item
- MLlibRating(u, i, rating) // MODIFIED
- }.cache()
-```
-
-
-7) Add "rateEvent" to class "TrainingData".
-
-```
- class TrainingData(
- val users: RDD[(String, User)],
- val items: RDD[(String, Item)],
- val rateEvents: RDD[RateEvent]
- )
-```
-
-8) Add "rateEvent" to object "TrainingData".
-
-```
- new TrainingData(
- users = usersRDD,
- items = itemsRDD,
- rateEvents = rateEventsRDD)
-```
-
-
-