You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ja...@apache.org on 2021/05/08 19:26:30 UTC

[systemds] branch master updated: [DOC] Amazon coproduct purchasing network recommendation with SystemDS (#1268)

This is an automated email from the ASF dual-hosted git repository.

janardhan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new ac38137  [DOC] Amazon coproduct purchasing network recommendation with SystemDS (#1268)
ac38137 is described below

commit ac38137d5e1a5cc96ba08333326a27cee99a5ffa
Author: j143 <j1...@protonmail.com>
AuthorDate: Sun May 9 00:56:22 2021 +0530

    [DOC] Amazon coproduct purchasing network recommendation with SystemDS (#1268)
    
    * process amazon0601.txt snap dataset and run PNMF
    * capture results into a dataframe
    * This demonstrate how to process data and visualize with scala
---
 notebooks/databricks/MLContext.scala | 128 ++++++++++++++++++++++++++++++++++-
 1 file changed, 126 insertions(+), 2 deletions(-)

diff --git a/notebooks/databricks/MLContext.scala b/notebooks/databricks/MLContext.scala
index dc10860..55b6536 100644
--- a/notebooks/databricks/MLContext.scala
+++ b/notebooks/databricks/MLContext.scala
@@ -21,7 +21,7 @@
 
 // COMMAND ----------
 
-// MAGIC %md # Apache SystemDS on Databricks in 5 minutes
+// MAGIC %md # Apache SystemDS on Databricks
 
 // COMMAND ----------
 
@@ -66,7 +66,7 @@ ml.execute(uni)
 
 // COMMAND ----------
 
-// MAGIC %md #### Create a neural network layer with (R-like) DML language
+// MAGIC %md ### Create a neural network layer with (R-like) DML language
 
 // COMMAND ----------
 
@@ -79,3 +79,127 @@ val s = """
   """
 
 val ret = ml.execute(dml(s).out("R")).getScalarObject("R").getDoubleValue();
+
+// COMMAND ----------
+
+// MAGIC %md ### Recommendation with Amazon review dataset
+
+// COMMAND ----------
+
+import java.net.URL
+import java.io.File
+import org.apache.commons.io.FileUtils
+
+FileUtils.copyURLToFile(new URL("http://snap.stanford.edu/data/amazon0601.txt.gz"), new File("/tmp/amazon0601.txt.gz"))
+
+// COMMAND ----------
+
+// MAGIC %sh
+// MAGIC gunzip -d /tmp/amazon0601.txt.gz
+
+// COMMAND ----------
+
+// To list the file system files. For more https://docs.databricks.com/data/filestore.html
+// File system: display(dbutils.fs.ls("file:/tmp"))
+// DBFS: display(dbutils.fs.ls("."))
+
+dbutils.fs.mv("file:/tmp/amazon0601.txt", "dbfs:/tmp/amazon0601.txt")
+
+// COMMAND ----------
+
+display(dbutils.fs.ls("/tmp"))
+// display(dbutils.fs.ls("file:/tmp"))
+
+// COMMAND ----------
+
+// move temporary files to databricks file system (DBFS)
+// dbutils.fs.mv("file:/databricks/driver/amazon0601.txt", "dbfs:/tmp/amazon0601.txt") 
+val df = spark.read.format("text").option("inferSchema", "true").option("header","true").load("dbfs:/tmp/amazon0601.txt")
+display(df)
+
+// COMMAND ----------
+
+// MAGIC %py
+// MAGIC 
+// MAGIC # The scala data processing pipeline can also be
+// MAGIC # implemented in python as shown in this block
+// MAGIC 
+// MAGIC # 
+// MAGIC # import pyspark.sql.functions as F
+// MAGIC # # https://spark.apache.org/docs/latest/sql-ref.html
+// MAGIC 
+// MAGIC # dataPath = "dbfs:/tmp/amazon0601.txt"
+// MAGIC 
+// MAGIC # X_train = (sc.textFile(dataPath)
+// MAGIC #     .filter(lambda l: not l.startswith("#"))
+// MAGIC #     .map(lambda l: l.split("\t"))
+// MAGIC #     .map(lambda prods: (int(prods[0]), int(prods[1]), 1.0))
+// MAGIC #     .toDF(("prod_i", "prod_j", "x_ij"))
+// MAGIC #     .filter("prod_i < 500 AND prod_j < 500") # Filter for memory constraints
+// MAGIC #     .cache())
+// MAGIC 
+// MAGIC # max_prod_i = X_train.select(F.max("prod_i")).first()[0]
+// MAGIC # max_prod_j = X_train.select(F.max("prod_j")).first()[0]
+// MAGIC # numProducts = max(max_prod_i, max_prod_j) + 1 # 0-based indexing
+// MAGIC # print("Total number of products: {}".format(numProducts))
+
+// COMMAND ----------
+
+// Reference: https://spark.apache.org/docs/latest/rdd-programming-guide.html
+val X_train = (sc.textFile("dbfs:/tmp/amazon0601.txt").filter(l => !(l.startsWith("#"))).map(l => l.split("\t"))
+                  .map(prods => (prods(0).toLong, prods(1).toLong, 1.0))
+                  .toDF("prod_i", "prod_j", "x_ij")
+                  .filter("prod_i < 500 AND prod_j < 500") // filter for memory constraints
+                  .cache())
+
+display(X_train)
+
+// COMMAND ----------
+
+// MAGIC %md #### Poisson Nonnegative Matrix Factorization
+
+// COMMAND ----------
+
+# Poisson Nonnegative Matrix Factorization
+
+val pnmf = """
+# data & args
+X = X+1 # change product IDs to be 1-based, rather than 0-based
+V = table(X[,1], X[,2])
+size = ifdef($size, -1)
+if(size > -1) {
+    V = V[1:size,1:size]
+}
+
+n = nrow(V)
+m = ncol(V)
+range = 0.01
+W = Rand(rows=n, cols=rank, min=0, max=range, pdf="uniform")
+H = Rand(rows=rank, cols=m, min=0, max=range, pdf="uniform")
+losses = matrix(0, rows=max_iter, cols=1)
+
+# run PNMF
+i=1
+while(i <= max_iter) {
+  # update params
+  H = (H * (t(W) %*% (V/(W%*%H))))/t(colSums(W)) 
+  W = (W * ((V/(W%*%H)) %*% t(H)))/t(rowSums(H))
+  
+  # compute loss
+  losses[i,] = -1 * (sum(V*log(W%*%H)) - as.scalar(colSums(W)%*%rowSums(H)))
+  i = i + 1;
+}
+  """
+
+val ret = ml.execute(dml(pnmf).in("X", X_train).in("max_iter", 100).in("rank", 10).out("W").out("H").out("losses"));
+
+// COMMAND ----------
+
+val W = ret.getMatrix("W")
+val H = ret.getMatrix("H")
+val losses = ret.getMatrix("losses")
+
+// COMMAND ----------
+
+val lossesDF = losses.toDF().sort("__INDEX")
+display(lossesDF)