You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 03:59:15 UTC

[jira] [Updated] (SPARK-23810) Matrix Multiplication is so bad, file I/O to local python is better

     [ https://issues.apache.org/jira/browse/SPARK-23810?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon updated SPARK-23810:
---------------------------------
    Labels: bulk-closed  (was: )

> Matrix Multiplication is so bad, file I/O to local python is better
> -------------------------------------------------------------------
>
>                 Key: SPARK-23810
>                 URL: https://issues.apache.org/jira/browse/SPARK-23810
>             Project: Spark
>          Issue Type: Bug
>          Components: MLlib
>    Affects Versions: 2.2.0
>            Reporter: dciborow
>            Priority: Minor
>              Labels: bulk-closed
>         Attachments: image-2018-04-02-20-34-44-980.png
>
>
> I am trying to multiple two matrices. One is 130k by 30k. The second is 30k by 30k.
> Running this leads to hearbeat timeout, Java Heap Space and Garage collection errors.
> {{rdd.toBlockMatrix.multiply(rightRdd.toBlockMatrix).toIndexedRowMatrix()}}
> {{I have also tried the following which will fail on the toLocalMatrix call. }}
> val userMatrix = new CoordinateMatrix(userRDD).toIndexedRowMatrix()
>  val itemMatrix = new CoordinateMatrix(itemRDD).toBlockMatrix().toLocalMatrix()
> val itemMatrixBC = session.sparkContext.broadcast(itemMatrix)
>  val userToItemMatrix = userMatrix
>  .multiply(itemMatrixBC.value)
>  .rows.map(index => (index.index.toInt, index.vector))
>  
> I instead have gotten this operation "working", by saving the inputs dataframes to parquet(which start as DataFrames before the .rdd call to get them to work with the matrix types), and then loading them into python/pandas, using numpy for the matrix mulplication, saving back to parquet, and rereading back into spark.
>  
> Python -
> import pandas as pd
> import numpy as np
> X = pd.read_parquet('./items-parquet', engine='pyarrow')
> #Xp = np.stack(X.jaccardList)
> Xp = pd.DataFrame(np.stack(X.jaccardList), X.itemID)
> Xrows = pd.DataFrame(index=range(0, X.itemID.max()+1))
> Xpp = Xrows.join(Xp).fillna(0)
> Y = pd.read_parquet('./users-parquet',engine='pyarrow')
> Yp = np.stack(Y.flatList)
> Z = np.matmul(Yp, Xpp)
> Zp = pd.DataFrame(Z)
> Zp.columns = list(map(str, Zp.columns))
> Zpp = pd.DataFrame()
> Zpp['id'] = Zp.index
> Zpp['ratings'] = Zp.values.tolist()
> Zpp.to_parquet("sampleout.parquet",engine='pyarrow')
>  
> Scala -
> import sys.process._
>  val result = "python matmul.py".!
>  val pythonOutput = userDataFrame.sparkSession.read.parquet("./sampleout.parquet")
>  
> I can provide code, and the data to repo. But could use some instructions how to set that up. This is based on the MovieLens 20mil dataset, or I can provide access to my data in Azure. 
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org