You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by Tagar <gi...@git.apache.org> on 2018/06/27 22:42:49 UTC
[GitHub] spark pull request #13274: Glrm
Github user Tagar commented on a diff in the pull request:
https://github.com/apache/spark/pull/13274#discussion_r198663163
--- Diff: examples/src/main/scala/org/apache/spark/examples/glrm/SparkGLRM.scala ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regPenarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples
+
+import breeze.linalg.{DenseVector => BDV}
+import org.apache.spark.SparkContext._
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.mllib.linalg.distributed.MatrixEntry
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{SparkConf, SparkContext}
+
+import scala.collection.BitSet
+
+/**
+ * Generalized Low Rank Models for Spark
+ *
+ * Run these commands from the spark root directory.
+ *
+ * Compile with:
+ * sbt/sbt assembly
+ *
+ * Run with:
+ * ./bin/spark-submit --class org.apache.spark.examples.SparkGLRM \
+ * ./examples/target/scala-2.10/spark-examples-1.1.0-SNAPSHOT-hadoop1.0.4.jar \
+ * --executor-memory 1G \
+ * --driver-memory 1G
+ */
+
+object SparkGLRM {
+ /*********************************
+ * GLRM: Bank of loss functions
+ *********************************/
+ def lossL2squaredGrad(i: Int, j: Int, prediction: Double, actual: Double): Double = {
+ prediction - actual
+ }
+
+ def lossL1Grad(i: Int, j: Int, prediction: Double, actual: Double): Double = {
+ // a subgradient of L1
+ math.signum(prediction - actual)
+ }
+
+ def mixedLossGrad(i: Int, j: Int, prediction: Double, actual: Double): Double = {
+ // weird loss function subgradient for demonstration
+ if (i + j % 2 == 0) lossL1Grad(i, j, prediction, actual) else lossL2squaredGrad(i, j, prediction, actual)
+ }
+
+ /***********************************
+ * GLRM: Bank of prox functions
+ **********************************/
+ // L2 prox
+ def proxL2(v:BDV[Double], stepSize:Double, regPen:Double): BDV[Double] = {
+ val arr = v.toArray.map(x => x / (1.0 + stepSize * regPen))
+ new BDV[Double](arr)
+ }
+
+ // L1 prox
+ def proxL1(v:BDV[Double], stepSize:Double, regPen:Double): BDV[Double] = {
+ val sr = regPen * stepSize
+ val arr = v.toArray.map(x =>
+ if (math.abs(x) < sr) 0
+ else if (x < -sr) x + sr
+ else x - sr
+ )
+ new BDV[Double](arr)
+ }
+
+ // Non-negative prox
+ def proxNonneg(v:BDV[Double], stepSize:Double, regPen:Double): BDV[Double] = {
+ val arr = v.toArray.map(x => math.max(x, 0))
+ new BDV[Double](arr)
+ }
+
+ /* End of GLRM libarry */
+
+
+ // Helper functions for updating
+ def computeLossGrads(ms: Broadcast[Array[BDV[Double]]], us: Broadcast[Array[BDV[Double]]],
+ R: RDD[(Int, Int, Double)],
+ lossGrad: (Int, Int, Double, Double) => Double) : RDD[(Int, Int, Double)] = {
+ R.map { case (i, j, rij) => (i, j, lossGrad(i, j, ms.value(i).dot(us.value(j)), rij))}
+ }
+
+ // Update factors
+ def update(us: Broadcast[Array[BDV[Double]]], ms: Broadcast[Array[BDV[Double]]],
+ lossGrads: RDD[(Int, Int, Double)], stepSize: Double,
+ nnz: Array[Double],
+ prox: (BDV[Double], Double, Double) => BDV[Double], regPen: Double)
+ : Array[BDV[Double]] = {
+ val rank = ms.value(0).length
+ val ret = Array.fill(ms.value.size)(BDV.zeros[Double](rank))
+
+ val retu = lossGrads.map { case (i, j, lossij) => (i, us.value(j) * lossij) } // vector/scalar multiply
+ .reduceByKey(_ + _).collect() // vector addition through breeze
+
+ for (entry <- retu) {
+ val idx = entry._1
+ val g = entry._2
+ val alpha = (stepSize / (nnz(idx) + 1))
+
+ ret(idx) = prox(ms.value(idx) - g * alpha, alpha, regPen)
+ }
+
+ ret
+ }
+
+ def fitGLRM(R: RDD[(Int, Int, Double)], M:Int, U:Int,
+ lossFunctionGrad: (Int, Int, Double, Double) => Double,
+ moviesProx: (BDV[Double], Double, Double) => BDV[Double],
+ usersProx: (BDV[Double], Double, Double) => BDV[Double],
+ rank: Int,
+ numIterations: Int,
+ regPen: Double) : (Array[BDV[Double]], Array[BDV[Double]], Array[Double]) = {
+ // Transpose data
+ val RT = R.map { case (i, j, rij) => (j, i, rij) }.cache()
+
+ val sc = R.context
+
+ // Compute number of nonzeros per row and column
+ val mCountRDD = R.map { case (i, j, rij) => (i, 1) }.reduceByKey(_ + _).collect()
+ val mCount = Array.ofDim[Double](M)
+ for (entry <- mCountRDD)
+ mCount(entry._1) = entry._2
+ val maxM = mCount.max
+ val uCountRDD = R.map { case (i, j, rij) => (j, 1) }.reduceByKey(_ + _).collect()
+ val uCount = Array.ofDim[Double](U)
+ for (entry <- uCountRDD)
+ uCount(entry._1) = entry._2
+ val maxU = uCount.max
+
+ // Initialize m and u
+ var ms = Array.fill(M)(BDV[Double](Array.tabulate(rank)(x => math.random / (M * U))))
+ var us = Array.fill(U)(BDV[Double](Array.tabulate(rank)(x => math.random / (M * U))))
+
+ // Iteratively update movies then users
+ var msb = sc.broadcast(ms)
+ var usb = sc.broadcast(us)
--- End diff --
Does it make assumption that both `ms` and `us` can fit on each of the executors?
How well does it scale?
Thanks!
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org