You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Shane Lee <sh...@yahoo.com.INVALID> on 2016/08/09 03:35:17 UTC

SparkR error when repartition is called

Hi All,
I am trying out SparkR 2.0 and have run into an issue with repartition. 
Here is the R code (essentially a port of the pi-calculating scala example in the spark package) that can reproduce the behavior:
schema <- structType(structField("input", "integer"),     structField("output", "integer"))
library(magrittr)

len = 3000data.frame(n = 1:len) %>%    as.DataFrame %>%    SparkR:::repartition(10L) %>% dapply(., function (df) { library(plyr) ddply(df, .(n), function (y)
 { data.frame(z =  { x1 = runif(1) * 2 - 1 y1 = runif(1) * 2 - 1 z = x1 * x1 + y1 * y1 if (z < 1) { 1L } else { 0L } }) }) } , schema ) %>%  SparkR:::summarize(total = sum(.$output)) %>% collect * 4 / len
For me it runs fine as long as len is less than 5000, otherwise it errors out with the following message:
Error in invokeJava(isStatic = TRUE, className, methodName, ...) :   org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 56.0 failed 4 times, most recent failure: Lost task 6.3 in stage 56.0 (TID 899, LARBGDV-VM02): org.apache.spark.SparkException: R computation failed with Error in readBin(con, raw(), stringLen, endian = "big") :   invalid 'n' argumentCalls: <Anonymous> -> readBinExecution halted at org.apache.spark.api.r.RRunner.compute(RRunner.scala:108) at org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(MapPartitionsRWrapper.scala:59) at org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(MapPartitionsRWrapper.scala:29) at org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(objects.scala:178) at org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(objects.scala:175) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$
If the repartition call is removed, it runs fine again, even with very large len.
After looking through the documentations and searching the web, I can't seem to find any clues how to fix this. Anybody has seen similary problem?
Thanks in advance for your help.
Shane

Re: SparkR error when repartition is called

Posted by Felix Cheung <fe...@hotmail.com>.
I think it's saying a string isn't being sent properly from the JVM side.

Does it work for you if you change the dapply UDF to something simpler?

Do you have any log from YARN?


_____________________________
From: Shane Lee <sh...@yahoo.com.invalid>>
Sent: Tuesday, August 9, 2016 12:19 AM
Subject: Re: SparkR error when repartition is called
To: Sun Rui <su...@163.com>>
Cc: User <us...@spark.apache.org>>


Sun,

I am using spark in yarn client mode in a 2-node cluster with hadoop-2.7.2. My R version is 3.3.1.

I have the following in my spark-defaults.conf:
spark.executor.extraJavaOptions =-XX:+PrintGCDetails -XX:+HeapDumpOnOutOfMemoryError
spark.r.command=c:/R/R-3.3.1/bin/x64/Rscript
spark.ui.killEnabled=true
spark.executor.instances = 3
spark.serializer = org.apache.spark.serializer.KryoSerializer
spark.shuffle.file.buffer = 1m
spark.driver.maxResultSize=0
spark.executor.memory=8g
spark.executor.cores = 6

I also ran into some other R errors that I was able to bypass by modifying the worker.R file (attached). In a nutshell I was getting the "argument is length of zero" error sporadically so I put in extra checks for it.

Thanks,

Shane

On Monday, August 8, 2016 11:53 PM, Sun Rui <su...@163.com>> wrote:


I can't reproduce your issue with len=10000 in local mode.
Could you give more environment information?
On Aug 9, 2016, at 11:35, Shane Lee <sh...@yahoo.com.invalid>> wrote:

Hi All,

I am trying out SparkR 2.0 and have run into an issue with repartition.

Here is the R code (essentially a port of the pi-calculating scala example in the spark package) that can reproduce the behavior:

schema <- structType(structField("input", "integer"),
    structField("output", "integer"))

library(magrittr)

len = 3000
data.frame(n = 1:len) %>%
    as.DataFrame %>%
    SparkR:::repartition(10L) %>%
dapply(., function (df)
{
library(plyr)
ddply(df, .(n), function (y)
{
data.frame(z =
{
x1 = runif(1) * 2 - 1
y1 = runif(1) * 2 - 1
z = x1 * x1 + y1 * y1
if (z < 1)
{
1L
}
else
{
0L
}
})
})
}
, schema
) %>%
SparkR:::summarize(total = sum(.$output)) %>% collect * 4 / len

For me it runs fine as long as len is less than 5000, otherwise it errors out with the following message:

Error in invokeJava(isStatic = TRUE, className, methodName, ...) :
  org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 56.0 failed 4 times, most recent failure: Lost task 6.3 in stage 56.0 (TID 899, LARBGDV-VM02): org.apache.spark.SparkException: R computation failed with
 Error in readBin(con, raw(), stringLen, endian = "big") :
  invalid 'n' argument
Calls: <Anonymous> -> readBin
Execution halted
at org.apache.spark.api.r.RRunner.compute(RRunner.scala:108)
at org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(MapPartitionsRWrapper.scala:59)
at org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(MapPartitionsRWrapper.scala:29)
at org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(objects.scala:178)
at org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(objects.scala:175)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$

If the repartition call is removed, it runs fine again, even with very large len.

After looking through the documentations and searching the web, I can't seem to find any clues how to fix this. Anybody has seen similary problem?

Thanks in advance for your help.

Shane







Re: SparkR error when repartition is called

Posted by Shane Lee <sh...@yahoo.com.INVALID>.
Sun,
I am using spark in yarn client mode in a 2-node cluster with hadoop-2.7.2. My R version is 3.3.1.
I have the following in my spark-defaults.conf:spark.executor.extraJavaOptions =-XX:+PrintGCDetails -XX:+HeapDumpOnOutOfMemoryErrorspark.r.command=c:/R/R-3.3.1/bin/x64/Rscriptspark.ui.killEnabled=truespark.executor.instances = 3
spark.serializer = org.apache.spark.serializer.KryoSerializerspark.shuffle.file.buffer = 1mspark.driver.maxResultSize=0spark.executor.memory=8gspark.executor.cores = 6 
I also ran into some other R errors that I was able to bypass by modifying the worker.R file (attached). In a nutshell I was getting the "argument is length of zero" error sporadically so I put in extra checks for it.
Thanks,
Shane
    On Monday, August 8, 2016 11:53 PM, Sun Rui <su...@163.com> wrote:
 

 I can’t reproduce your issue with len=10000 in local mode.Could you give more environment information?

On Aug 9, 2016, at 11:35, Shane Lee <sh...@yahoo.com.INVALID> wrote:
Hi All,
I am trying out SparkR 2.0 and have run into an issue with repartition. 
Here is the R code (essentially a port of the pi-calculating scala example in the spark package) that can reproduce the behavior:
schema <- structType(structField("input", "integer"),     structField("output", "integer"))
library(magrittr)

len = 3000data.frame(n = 1:len) %>%    as.DataFrame %>%    SparkR:::repartition(10L) %>% dapply(., function (df) { library(plyr) ddply(df, .(n), function (y)
 { data.frame(z =  { x1 = runif(1) * 2 - 1 y1 = runif(1) * 2 - 1 z = x1 * x1 + y1 * y1 if (z < 1) { 1L } else { 0L } }) }) } , schema ) %>%  SparkR:::summarize(total = sum(.$output)) %>% collect * 4 / len
For me it runs fine as long as len is less than 5000, otherwise it errors out with the following message:
Error in invokeJava(isStatic = TRUE, className, methodName, ...) :   org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 56.0 failed 4 times, most recent failure: Lost task 6.3 in stage 56.0 (TID 899, LARBGDV-VM02): org.apache.spark.SparkException: R computation failed with Error in readBin(con, raw(), stringLen, endian = "big") :   invalid 'n' argumentCalls: <Anonymous> -> readBinExecution halted at org.apache.spark.api.r.RRunner.compute(RRunner.scala:108) at org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(MapPartitionsRWrapper.scala:59) at org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(MapPartitionsRWrapper.scala:29) at org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(objects.scala:178) at org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(objects.scala:175) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$
If the repartition call is removed, it runs fine again, even with very large len.
After looking through the documentations and searching the web, I can't seem to find any clues how to fix this. Anybody has seen similary problem?
Thanks in advance for your help.
Shane




  

Re: SparkR error when repartition is called

Posted by Sun Rui <su...@163.com>.
I can’t reproduce your issue with len=10000 in local mode.
Could you give more environment information?
> On Aug 9, 2016, at 11:35, Shane Lee <sh...@yahoo.com.INVALID> wrote:
> 
> Hi All,
> 
> I am trying out SparkR 2.0 and have run into an issue with repartition. 
> 
> Here is the R code (essentially a port of the pi-calculating scala example in the spark package) that can reproduce the behavior:
> 
> schema <- structType(structField("input", "integer"), 
>     structField("output", "integer"))
> 
> library(magrittr)
> 
> len = 3000
> data.frame(n = 1:len) %>%
>     as.DataFrame %>%
>     SparkR:::repartition(10L) %>%
> 	dapply(., function (df)
> 	{
> 		library(plyr)
> 		ddply(df, .(n), function (y)
> 		{
> 			data.frame(z = 
> 			{
> 				x1 = runif(1) * 2 - 1
> 				y1 = runif(1) * 2 - 1
> 				z = x1 * x1 + y1 * y1
> 				if (z < 1)
> 				{
> 					1L
> 				}
> 				else
> 				{
> 					0L
> 				}
> 			})
> 		})
> 	}
> 	, schema
> 	) %>% 
> 	SparkR:::summarize(total = sum(.$output)) %>% collect * 4 / len
> 
> For me it runs fine as long as len is less than 5000, otherwise it errors out with the following message:
> 
> Error in invokeJava(isStatic = TRUE, className, methodName, ...) : 
>   org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 56.0 failed 4 times, most recent failure: Lost task 6.3 in stage 56.0 (TID 899, LARBGDV-VM02): org.apache.spark.SparkException: R computation failed with
>  Error in readBin(con, raw(), stringLen, endian = "big") : 
>   invalid 'n' argument
> Calls: <Anonymous> -> readBin
> Execution halted
> 	at org.apache.spark.api.r.RRunner.compute(RRunner.scala:108)
> 	at org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(MapPartitionsRWrapper.scala:59)
> 	at org.apache.spark.sql.execution.r.MapPartitionsRWrapper.apply(MapPartitionsRWrapper.scala:29)
> 	at org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(objects.scala:178)
> 	at org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$6.apply(objects.scala:175)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$
> 
> If the repartition call is removed, it runs fine again, even with very large len.
> 
> After looking through the documentations and searching the web, I can't seem to find any clues how to fix this. Anybody has seen similary problem?
> 
> Thanks in advance for your help.
> 
> Shane
>