You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Zoltan Toth (JIRA)" <ji...@apache.org> on 2015/09/10 07:28:46 UTC

[jira] [Comment Edited] (SPARK-10487) MLlib model fitting causes DataFrame write to break with OutOfMemory exception

    [ https://issues.apache.org/jira/browse/SPARK-10487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14738194#comment-14738194 ] 

Zoltan Toth edited comment on SPARK-10487 at 9/10/15 5:28 AM:
--------------------------------------------------------------

Yes it only happens if you use mllib or ML and you fit a model. You don't need a DataPoint, e.g. if you make a trivial `glm` linear regression on the built in `iris` dataset in SparkR, it also fails.

{code}
library(SparkR)

sc <- sparkR.init()
sqlContext <- sparkRSQL.init(sc)

training <- createDataFrame(sqlContext, iris)
test <- select(training, "Sepal_Length")
model <- glm(Sepal_Width ~ Sepal_Length, training, family = "gaussian")
prediction <- predict(model, test)

SparkR:::saveAsParquetFile(prediction, "/tmp/SparkR-logreg-prediction-data")
{code} 

Again, only in `cluster` master mode.


was (Author: zoltanctoth):
Yes it only happens if you use mllib or ML and you fit a model. You don't need a DataPoint, e.g. if you make a trivial `glm` linear regression on the built in `iris` dataset in SparkR, it also fails.

{code}
library(SparkR)

sc <- sparkR.init()
sqlContext <- sparkRSQL.init(sc)

training <- createDataFrame(sqlContext, iris)
test <- select(training, "Sepal_Length")
model <- glm(Sepal_Width ~ Sepal_Length, training, family = "gaussian")
prediction <- predict(model, test)

SparkR:::saveAsParquetFile(prediction, "/tmp/SparkR-logreg-prediction-data")
{code} 

Again, onlyi in `cluster` master mode

> MLlib model fitting causes DataFrame write to break with OutOfMemory exception
> ------------------------------------------------------------------------------
>
>                 Key: SPARK-10487
>                 URL: https://issues.apache.org/jira/browse/SPARK-10487
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.5.0, 1.5.1
>         Environment: Tried in a centos-based 1-node YARN in docker and on a real-world CDH5 cluster
> Spark 1.5.0-SNAPSHOT built for Hadoop 2.6.0 (I'm working with the latest nightly build)
> Build flags: -Psparkr -Phadoop-2.6 -Phive -Phive-thriftserver -Pyarn -DzincPort=3034
> I'm using the default resource setup
> 15/09/07 08:49:04 INFO yarn.YarnAllocator: Will request 2 executor containers, each with 1 cores and 1408 MB memory including 384 MB overhead
> 15/09/07 08:49:04 INFO yarn.YarnAllocator: Container request (host: Any, capability: <memory:1408, vCores:1>)
> 15/09/07 08:49:04 INFO yarn.YarnAllocator: Container request (host: Any, capability: <memory:1408, vCores:1>)
>            Reporter: Zoltan Toth
>
> After fitting a _spark.ml_ or _mllib model_ in *cluster* deploy mode, no dataframes can be written to hdfs. The driver receives an OutOfMemory exception during the writing. It seems, however, that the file gets written successfully.
>  * This happens both in SparkR and pyspark
>  * Only happens in cluster deploy mode
>  * The write fails regardless the size of the dataframe and whether the dataframe is associated with the ml model.
> REPRO:
> {code}
> from pyspark import SparkContext, SparkConf
> from pyspark.sql import SQLContext
> from pyspark.ml.classification import LogisticRegression
> from pyspark.mllib.regression import LabeledPoint
> from pyspark.mllib.linalg import Vector, Vectors
> conf = SparkConf().setAppName("LogRegTest")
> sc = SparkContext(conf=conf)
> sqlContext = SQLContext(sc)
> sqlContext.setConf("park.sql.parquet.compression.codec", "uncompressed")
> training = sc.parallelize((
>   LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
>   LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5))))
> df = training.toDF()
> reg = LogisticRegression().setMaxIter(10).setRegParam(0.01)
> model = reg.fit(df)
> # Note that this is a brand new dataframe:
> one_df = sc.parallelize((
>   LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
>   LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5)))).toDF()
> one_df.write.mode("overwrite").parquet("/tmp/df.parquet")
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org