You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Nan Zhu <zh...@gmail.com> on 2014/07/21 23:29:37 UTC

broadcast variable get cleaned by ContextCleaner unexpectedly ?

Hi, all  

When I run some Spark application (actually unit test of the application in Jenkins ), I found that I always hit the FileNotFoundException when reading broadcast variable  

The program itself works well, except the unit test

Here is the example log:


14/07/21 19:49:13 INFO Executor: Running task ID 4 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 2) 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 2 in 95 ms on localhost (progress: 3/106) 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers 14/07/21 19:49:13 INFO Executor: Serialized size of result for 3 is 596 14/07/21 19:49:13 INFO Executor: Sending result for 3 directly to driver 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally 14/07/21 19:49:13 INFO Executor: Finished task ID 3 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:5 as TID 5 on executor localhost: localhost (PROCESS_LOCAL) 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:5 as 11885 bytes in 0 ms 14/07/21 19:49:13 INFO Executor: Running task ID 5 14/07/21 19:49:13 INFO BlockManager: Removing broadcast 0 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 3) 14/07/21 19:49:13 INFO ContextCleaner: Cleaned broadcast 0 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 3 in 97 ms on localhost (progress: 4/106) 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally 14/07/21 19:49:13 INFO BlockManager: Removing block broadcast_0 14/07/21 19:49:13 INFO MemoryStore: Block broadcast_0 of size 202564 dropped from memory (free 886623436) 14/07/21 19:49:13 INFO ContextCleaner: Cleaned shuffle 0 14/07/21 19:49:13 INFO ShuffleBlockManager: Deleted all files for shuffle 0 14/07/21 19:49:13 INFO HadoopRDD: Input split: hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+5 14/07/21 19:49:13 INFO HadoopRDD: Input split: hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+5 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers 14/07/21 19:49:13 INFO Executor: Serialized size of result for 4 is 596 14/07/21 19:49:13 INFO Executor: Sending result for 4 directly to driver 14/07/21 19:49:13 INFO Executor: Finished task ID 4 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:6 as TID 6 on executor localhost: localhost (PROCESS_LOCAL) 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:6 as 11885 bytes in 0 ms 14/07/21 19:49:13 INFO Executor: Running task ID 6 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 4) 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 4 in 80 ms on localhost (progress: 5/106) 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers 14/07/21 19:49:13 INFO Executor: Serialized size of result for 5 is 596 14/07/21 19:49:13 INFO Executor: Sending result for 5 directly to driver 14/07/21 19:49:13 INFO Executor: Finished task ID 5 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:7 as TID 7 on executor localhost: localhost (PROCESS_LOCAL) 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:7 as 11885 bytes in 0 ms 14/07/21 19:49:13 INFO Executor: Running task ID 7 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 5) 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 5 in 77 ms on localhost (progress: 6/106) 14/07/21 19:49:13 INFO HttpBroadcast: Started reading broadcast variable 0 14/07/21 19:49:13 INFO HttpBroadcast: Started reading broadcast variable 0 14/07/21 19:49:13 ERROR Executor: Exception in task ID 6 java.io.FileNotFoundException: http://172.31.34.174:52070/broadcast_0 at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1624) at org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:196) at org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:89) at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) at org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:61) at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:141) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:169) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744)

I highlighted the lines indicating the ContextCleaner cleaned the broadcast variable, I’m wondering why the variable is cleaned, since there are enough memory space?

Best,  

--  
Nan Zhu


Re: broadcast variable get cleaned by ContextCleaner unexpectedly ?

Posted by swetha <sw...@gmail.com>.
Hi,

How is the ContextCleaner different from spark.cleaner.ttl?Is
spark.cleaner.ttl when there is ContextCleaner in the Streaming job?


Thanks,
Swetha



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/broadcast-variable-get-cleaned-by-ContextCleaner-unexpectedly-tp10347p24646.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: broadcast variable get cleaned by ContextCleaner unexpectedly ?

Posted by RodrigoB <ro...@aspect.com>.
Could you be using by any chance the getOrCreate for the StreamingContext
creation?

I've seen this happen when I tried to first create the Spark context, then
create the broadcast variables, and then recreate the StreamingContext from
the checkpoint directory. So the worker process cannot find the broadcast
variables that were used in the saved StreamingContext.

tnks,
Rod



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/broadcast-variable-get-cleaned-by-ContextCleaner-unexpectedly-tp10347p14884.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: broadcast variable get cleaned by ContextCleaner unexpectedly ?

Posted by Nan Zhu <zh...@gmail.com>.
Ah, sorry, sorry, my brain just damaged….. sent some wrong information  

not “spark.cores.max” but the minPartitions in sc.textFile()  


Best,

--  
Nan Zhu


On Monday, July 21, 2014 at 7:17 PM, Tathagata Das wrote:

> That is definitely weird. spark.core.max should not affect thing when they are running local mode.  
>  
> And, I am trying to think of scenarios that could cause a broadcast variable used in the current job to fall out of scope, but they all seem very far fetched. So i am really curious to see the code where this could be happening.  
>  
> Either ways, you could turn off the behavior by using spark.cleaner.referenceTracking=false
>  
> TD
>  
>  
> On Mon, Jul 21, 2014 at 3:52 PM, Nan Zhu <zhunanmcgill@gmail.com (mailto:zhunanmcgill@gmail.com)> wrote:
> > Hi, TD,   
> >  
> > I think I got more insights to the problem
> >  
> > in the Jenkins test file, I mistakenly pass a wrong value to spark.cores.max, which is much larger than the expected value   
> >  
> > (I passed master address as local[6], and spark.core.max as 200)
> >  
> > If I set a more consistent value, everything goes well,  
> >  
> > But I do not think it will bring this problem even the spark.cores.max is too large?  
> >  
> > Best,  
> >  
> > --  
> > Nan Zhu
> >  
> >  
> > On Monday, July 21, 2014 at 6:11 PM, Nan Zhu wrote:
> >  
> > > Hi, TD,   
> > >  
> > > Thanks for the reply
> > >  
> > > I tried to reproduce this in a simpler program, but no luck  
> > >  
> > > However, the program has been very simple, just load some files from HDFS and write them to HBase….
> > >  
> > > ---
> > >  
> > > It seems that the issue only appears when I run the unit test in Jenkins (not fail every time, in usual, it will success in 1/10 times)  
> > >  
> > > I once suspected that it’s related to some concurrency issue, but even I disable the parallel test in built.sbt, the problem is still there  
> > >  
> > > ---
> > >  
> > > Best,  
> > >  
> > > --  
> > > Nan Zhu
> > >  
> > >  
> > > On Monday, July 21, 2014 at 5:40 PM, Tathagata Das wrote:
> > >  
> > > > The ContextCleaner cleans up data and metadata related to RDDs and broadcast variables, only when those variables are not in scope and get garbage-collected by the JVM. So if the broadcast variable in question is probably somehow going out of scope even before the job using the broadcast variable is in progress.  
> > > >  
> > > > Could you reproduce this behavior reliably in a simple code snippet that you can share with us?
> > > >  
> > > > TD
> > > >  
> > > >  
> > > >  
> > > > On Mon, Jul 21, 2014 at 2:29 PM, Nan Zhu <zhunanmcgill@gmail.com (mailto:zhunanmcgill@gmail.com)> wrote:
> > > > > Hi, all  
> > > > >  
> > > > > When I run some Spark application (actually unit test of the application in Jenkins ), I found that I always hit the FileNotFoundException when reading broadcast variable   
> > > > >  
> > > > > The program itself works well, except the unit test
> > > > >  
> > > > > Here is the example log:  
> > > > >  
> > > > >  
> > > > > 14/07/21 19:49:13 INFO Executor: Running task ID 4 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 2) 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 2 in 95 ms on localhost (progress: 3/106) 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers 14/07/21 19:49:13 INFO Executor: Serialized size of result for 3 is 596 14/07/21 19:49:13 INFO Executor: Sending result for 3 directly to driver 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally 14/07/21 19:49:13 INFO Executor: Finished task ID 3 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:5 as TID 5 on executor localhost: localhost (PROCESS_LOCAL) 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:5 as 11885 bytes in 0 ms 14/07/21 19:49:13 INFO Executor: Running task ID 5 14/07/21 19:49:13 INFO BlockManager: Removing broadcast 0 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 3) 14/07/21 19:49:13 INFO ContextCleaner: Cleaned broadcast 0 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 3 in 97 ms on localhost (progress: 4/106) 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally 14/07/21 19:49:13 INFO BlockManager: Removing block broadcast_0 14/07/21 19:49:13 INFO MemoryStore: Block broadcast_0 of size 202564 dropped from memory (free 886623436) 14/07/21 19:49:13 INFO ContextCleaner: Cleaned shuffle 0 14/07/21 19:49:13 INFO ShuffleBlockManager: Deleted all files for shuffle 0 14/07/21 19:49:13 INFO HadoopRDD: Input split: hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+5 14/07/21 (http://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+514/07/21) 19:49:13 INFO HadoopRDD: Input split: hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+5 14/07/21 (http://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+514/07/21) 19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers 14/07/21 19:49:13 INFO Executor: Serialized size of result for 4 is 596 14/07/21 19:49:13 INFO Executor: Sending result for 4 directly to driver 14/07/21 19:49:13 INFO Executor: Finished task ID 4 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:6 as TID 6 on executor localhost: localhost (PROCESS_LOCAL) 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:6 as 11885 bytes in 0 ms 14/07/21 19:49:13 INFO Executor: Running task ID 6 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 4) 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 4 in 80 ms on localhost (progress: 5/106) 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers 14/07/21 19:49:13 INFO Executor: Serialized size of result for 5 is 596 14/07/21 19:49:13 INFO Executor: Sending result for 5 directly to driver 14/07/21 19:49:13 INFO Executor: Finished task ID 5 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:7 as TID 7 on executor localhost: localhost (PROCESS_LOCAL) 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:7 as 11885 bytes in 0 ms 14/07/21 19:49:13 INFO Executor: Running task ID 7 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 5) 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 5 in 77 ms on localhost (progress: 6/106) 14/07/21 19:49:13 INFO HttpBroadcast: Started reading broadcast variable 0 14/07/21 19:49:13 INFO HttpBroadcast: Started reading broadcast variable 0 14/07/21 19:49:13 ERROR Executor: Exception in task ID 6 java.io.FileNotFoundException: http://172.31.34.174:52070/broadcast_0 at sun.net (http://sun.net).www.protocol.http.HttpURLConnection.getInputStream (http://www.protocol.http.HttpURLConnection.getInputStream)(HttpURLConnection.java:1624) at org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:196) at org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:89) at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) at org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:61) at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:141) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:169) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744)
> > > > >  
> > > > > I highlighted the lines indicating the ContextCleaner cleaned the broadcast variable, I’m wondering why the variable is cleaned, since there are enough memory space?  
> > > > >  
> > > > > Best,  
> > > > >  
> > > > > --  
> > > > > Nan Zhu
> > > > >  
> > > > >  
> > > >  
> > > >  
> > > >  
> > >  
> >  
>  


Re: broadcast variable get cleaned by ContextCleaner unexpectedly ?

Posted by Tathagata Das <ta...@gmail.com>.
That is definitely weird. spark.core.max should not affect thing when they
are running local mode.

And, I am trying to think of scenarios that could cause a broadcast
variable used in the current job to fall out of scope, but they all seem
very far fetched. So i am really curious to see the code where this could
be happening.

Either ways, you could turn off the behavior by using
spark.cleaner.referenceTracking=false

TD


On Mon, Jul 21, 2014 at 3:52 PM, Nan Zhu <zh...@gmail.com> wrote:

>  Hi, TD,
>
> I think I got more insights to the problem
>
> in the Jenkins test file, I mistakenly pass a wrong value to spark.cores.max,
> which is much larger than the expected value
>
> (I passed master address as local[6], and spark.core.max as 200)
>
> If I set a more consistent value, everything goes well,
>
> But I do not think it will bring this problem even the spark.cores.max is
> too large?
>
> Best,
>
> --
> Nan Zhu
>
> On Monday, July 21, 2014 at 6:11 PM, Nan Zhu wrote:
>
>  Hi, TD,
>
> Thanks for the reply
>
> I tried to reproduce this in a simpler program, but no luck
>
> However, the program has been very simple, just load some files from HDFS
> and write them to HBase….
>
> ---
>
> It seems that the issue only appears when I run the unit test in Jenkins
> (not fail every time, in usual, it will success in 1/10 times)
>
> I once suspected that it’s related to some concurrency issue, but even I
> disable the parallel test in built.sbt, the problem is still there
>
> ---
>
> Best,
>
> --
> Nan Zhu
>
> On Monday, July 21, 2014 at 5:40 PM, Tathagata Das wrote:
>
> The ContextCleaner cleans up data and metadata related to RDDs and
> broadcast variables, only when those variables are not in scope and get
> garbage-collected by the JVM. So if the broadcast variable in question is
> probably somehow going out of scope even before the job using the broadcast
> variable is in progress.
>
> Could you reproduce this behavior reliably in a simple code snippet that
> you can share with us?
>
> TD
>
>
>
> On Mon, Jul 21, 2014 at 2:29 PM, Nan Zhu <zh...@gmail.com> wrote:
>
>  Hi, all
>
> When I run some Spark application (actually unit test of the application in
> Jenkins ), I found that I always hit the FileNotFoundException when
> reading broadcast variable
>
> The program itself works well, except the unit test
>
> Here is the example log:
>
>
> 14/07/21 19:49:13 INFO Executor: Running task ID 4
> 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 2)
> 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 2 in 95 ms on localhost (progress: 3/106)
> 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers
> 14/07/21 19:49:13 INFO Executor: Serialized size of result for 3 is 596
> 14/07/21 19:49:13 INFO Executor: Sending result for 3 directly to driver
> 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally
> 14/07/21 19:49:13 INFO Executor: Finished task ID 3
> 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:5 as TID 5 on executor localhost: localhost (PROCESS_LOCAL)
> 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:5 as 11885 bytes in 0 ms
> 14/07/21 19:49:13 INFO Executor: Running task ID 5
> 14/07/21 19:49:13 INFO BlockManager: Removing broadcast 0
> 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 3)*14/07/21 19:49:13 INFO ContextCleaner: Cleaned broadcast 0*
> 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 3 in 97 ms on localhost (progress: 4/106)
> 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally
> 14/07/21 19:49:13 INFO BlockManager: Removing block broadcast_0*14/07/21 19:49:13 INFO MemoryStore: Block broadcast_0 of size 202564 dropped from memory (free 886623436)*
> 14/07/21 19:49:13 INFO ContextCleaner: Cleaned shuffle 0
> 14/07/21 19:49:13 INFO ShuffleBlockManager: Deleted all files for shuffle 0
> 14/07/21 19:49:13 INFO HadoopRDD: Input split: hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+5
> 14/07/21 <http://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+514/07/21> 19:49:13 INFO HadoopRDD: Input split: hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+5
> 14/07/21 <http://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+514/07/21> 19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers
> 14/07/21 19:49:13 INFO Executor: Serialized size of result for 4 is 596
> 14/07/21 19:49:13 INFO Executor: Sending result for 4 directly to driver
> 14/07/21 19:49:13 INFO Executor: Finished task ID 4
> 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:6 as TID 6 on executor localhost: localhost (PROCESS_LOCAL)
> 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:6 as 11885 bytes in 0 ms
> 14/07/21 19:49:13 INFO Executor: Running task ID 6
> 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 4)
> 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 4 in 80 ms on localhost (progress: 5/106)
> 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers
> 14/07/21 19:49:13 INFO Executor: Serialized size of result for 5 is 596
> 14/07/21 19:49:13 INFO Executor: Sending result for 5 directly to driver
> 14/07/21 19:49:13 INFO Executor: Finished task ID 5
> 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:7 as TID 7 on executor localhost: localhost (PROCESS_LOCAL)
> 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:7 as 11885 bytes in 0 ms
> 14/07/21 19:49:13 INFO Executor: Running task ID 7
> 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 5)
> 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 5 in 77 ms on localhost (progress: 6/106)
> 14/07/21 19:49:13 INFO HttpBroadcast: Started reading broadcast variable 0
> 14/07/21 19:49:13 INFO HttpBroadcast: Started reading broadcast variable 0
> 14/07/21 19:49:13 ERROR Executor: Exception in task ID 6
> java.io.FileNotFoundException: http://172.31.34.174:52070/broadcast_0
> 	at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1624)
> 	at org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:196)
> 	at org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:89)
> 	at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:606)
> 	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> 	at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
> 	at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:606)
> 	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> 	at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
> 	at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:606)
> 	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> 	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
> 	at org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:61)
> 	at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:141)
> 	at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> 	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
> 	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:169)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 	at java.lang.Thread.run(Thread.java:744)
>
>
> I highlighted the lines indicating the ContextCleaner cleaned the broadcast variable, I’m wondering why the variable is cleaned, since there are enough memory space?
>
>
> Best,
>
>
> --
> Nan Zhu
>
>
>
>
>

Re: broadcast variable get cleaned by ContextCleaner unexpectedly ?

Posted by Nan Zhu <zh...@gmail.com>.
Hi, TD,   

I think I got more insights to the problem

in the Jenkins test file, I mistakenly pass a wrong value to spark.cores.max, which is much larger than the expected value  

(I passed master address as local[6], and spark.core.max as 200)

If I set a more consistent value, everything goes well,  

But I do not think it will bring this problem even the spark.cores.max is too large?

Best,  

--  
Nan Zhu


On Monday, July 21, 2014 at 6:11 PM, Nan Zhu wrote:

> Hi, TD,   
>  
> Thanks for the reply
>  
> I tried to reproduce this in a simpler program, but no luck
>  
> However, the program has been very simple, just load some files from HDFS and write them to HBase….
>  
> ---
>  
> It seems that the issue only appears when I run the unit test in Jenkins (not fail every time, in usual, it will success in 1/10 times)
>  
> I once suspected that it’s related to some concurrency issue, but even I disable the parallel test in built.sbt, the problem is still there
>  
> ---
>  
> Best,  
>  
> --  
> Nan Zhu
>  
>  
> On Monday, July 21, 2014 at 5:40 PM, Tathagata Das wrote:
>  
> > The ContextCleaner cleans up data and metadata related to RDDs and broadcast variables, only when those variables are not in scope and get garbage-collected by the JVM. So if the broadcast variable in question is probably somehow going out of scope even before the job using the broadcast variable is in progress.  
> >  
> > Could you reproduce this behavior reliably in a simple code snippet that you can share with us?
> >  
> > TD
> >  
> >  
> >  
> > On Mon, Jul 21, 2014 at 2:29 PM, Nan Zhu <zhunanmcgill@gmail.com (mailto:zhunanmcgill@gmail.com)> wrote:
> > > Hi, all  
> > >  
> > > When I run some Spark application (actually unit test of the application in Jenkins ), I found that I always hit the FileNotFoundException when reading broadcast variable   
> > >  
> > > The program itself works well, except the unit test
> > >  
> > > Here is the example log:  
> > >  
> > >  
> > > 14/07/21 19:49:13 INFO Executor: Running task ID 4 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 2) 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 2 in 95 ms on localhost (progress: 3/106) 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers 14/07/21 19:49:13 INFO Executor: Serialized size of result for 3 is 596 14/07/21 19:49:13 INFO Executor: Sending result for 3 directly to driver 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally 14/07/21 19:49:13 INFO Executor: Finished task ID 3 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:5 as TID 5 on executor localhost: localhost (PROCESS_LOCAL) 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:5 as 11885 bytes in 0 ms 14/07/21 19:49:13 INFO Executor: Running task ID 5 14/07/21 19:49:13 INFO BlockManager: Removing broadcast 0 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 3) 14/07/21 19:49:13 INFO ContextCleaner: Cleaned broadcast 0 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 3 in 97 ms on localhost (progress: 4/106) 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally 14/07/21 19:49:13 INFO BlockManager: Removing block broadcast_0 14/07/21 19:49:13 INFO MemoryStore: Block broadcast_0 of size 202564 dropped from memory (free 886623436) 14/07/21 19:49:13 INFO ContextCleaner: Cleaned shuffle 0 14/07/21 19:49:13 INFO ShuffleBlockManager: Deleted all files for shuffle 0 14/07/21 19:49:13 INFO HadoopRDD: Input split: hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+5 14/07/21 (http://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+514/07/21) 19:49:13 INFO HadoopRDD: Input split: hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+5 14/07/21 (http://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+514/07/21) 19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers 14/07/21 19:49:13 INFO Executor: Serialized size of result for 4 is 596 14/07/21 19:49:13 INFO Executor: Sending result for 4 directly to driver 14/07/21 19:49:13 INFO Executor: Finished task ID 4 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:6 as TID 6 on executor localhost: localhost (PROCESS_LOCAL) 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:6 as 11885 bytes in 0 ms 14/07/21 19:49:13 INFO Executor: Running task ID 6 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 4) 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 4 in 80 ms on localhost (progress: 5/106) 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers 14/07/21 19:49:13 INFO Executor: Serialized size of result for 5 is 596 14/07/21 19:49:13 INFO Executor: Sending result for 5 directly to driver 14/07/21 19:49:13 INFO Executor: Finished task ID 5 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:7 as TID 7 on executor localhost: localhost (PROCESS_LOCAL) 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:7 as 11885 bytes in 0 ms 14/07/21 19:49:13 INFO Executor: Running task ID 7 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 5) 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 5 in 77 ms on localhost (progress: 6/106) 14/07/21 19:49:13 INFO HttpBroadcast: Started reading broadcast variable 0 14/07/21 19:49:13 INFO HttpBroadcast: Started reading broadcast variable 0 14/07/21 19:49:13 ERROR Executor: Exception in task ID 6 java.io.FileNotFoundException: http://172.31.34.174:52070/broadcast_0 at sun.net (http://sun.net).www.protocol.http.HttpURLConnection.getInputStream (http://www.protocol.http.HttpURLConnection.getInputStream)(HttpURLConnection.java:1624) at org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:196) at org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:89) at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) at org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:61) at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:141) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:169) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744)
> > >  
> > > I highlighted the lines indicating the ContextCleaner cleaned the broadcast variable, I’m wondering why the variable is cleaned, since there are enough memory space?  
> > >  
> > > Best,  
> > >  
> > > --  
> > > Nan Zhu
> > >  
> > >  
> >  
> >  
> >  
>  


Re: broadcast variable get cleaned by ContextCleaner unexpectedly ?

Posted by Nan Zhu <zh...@gmail.com>.
Hi, TD,   

Thanks for the reply

I tried to reproduce this in a simpler program, but no luck

However, the program has been very simple, just load some files from HDFS and write them to HBase….

---

It seems that the issue only appears when I run the unit test in Jenkins (not fail every time, in usual, it will success in 1/10 times)

I once suspected that it’s related to some concurrency issue, but even I disable the parallel test in built.sbt, the problem is still there

---

Best,  

--  
Nan Zhu


On Monday, July 21, 2014 at 5:40 PM, Tathagata Das wrote:

> The ContextCleaner cleans up data and metadata related to RDDs and broadcast variables, only when those variables are not in scope and get garbage-collected by the JVM. So if the broadcast variable in question is probably somehow going out of scope even before the job using the broadcast variable is in progress.  
>  
> Could you reproduce this behavior reliably in a simple code snippet that you can share with us?
>  
> TD
>  
>  
>  
> On Mon, Jul 21, 2014 at 2:29 PM, Nan Zhu <zhunanmcgill@gmail.com (mailto:zhunanmcgill@gmail.com)> wrote:
> > Hi, all  
> >  
> > When I run some Spark application (actually unit test of the application in Jenkins ), I found that I always hit the FileNotFoundException when reading broadcast variable   
> >  
> > The program itself works well, except the unit test
> >  
> > Here is the example log:  
> >  
> >  
> > 14/07/21 19:49:13 INFO Executor: Running task ID 4 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 2) 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 2 in 95 ms on localhost (progress: 3/106) 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers 14/07/21 19:49:13 INFO Executor: Serialized size of result for 3 is 596 14/07/21 19:49:13 INFO Executor: Sending result for 3 directly to driver 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally 14/07/21 19:49:13 INFO Executor: Finished task ID 3 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:5 as TID 5 on executor localhost: localhost (PROCESS_LOCAL) 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:5 as 11885 bytes in 0 ms 14/07/21 19:49:13 INFO Executor: Running task ID 5 14/07/21 19:49:13 INFO BlockManager: Removing broadcast 0 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 3) 14/07/21 19:49:13 INFO ContextCleaner: Cleaned broadcast 0 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 3 in 97 ms on localhost (progress: 4/106) 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally 14/07/21 19:49:13 INFO BlockManager: Removing block broadcast_0 14/07/21 19:49:13 INFO MemoryStore: Block broadcast_0 of size 202564 dropped from memory (free 886623436) 14/07/21 19:49:13 INFO ContextCleaner: Cleaned shuffle 0 14/07/21 19:49:13 INFO ShuffleBlockManager: Deleted all files for shuffle 0 14/07/21 19:49:13 INFO HadoopRDD: Input split: hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+5 14/07/21 (http://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+514/07/21) 19:49:13 INFO HadoopRDD: Input split: hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+5 14/07/21 (http://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+514/07/21) 19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers 14/07/21 19:49:13 INFO Executor: Serialized size of result for 4 is 596 14/07/21 19:49:13 INFO Executor: Sending result for 4 directly to driver 14/07/21 19:49:13 INFO Executor: Finished task ID 4 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:6 as TID 6 on executor localhost: localhost (PROCESS_LOCAL) 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:6 as 11885 bytes in 0 ms 14/07/21 19:49:13 INFO Executor: Running task ID 6 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 4) 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 4 in 80 ms on localhost (progress: 5/106) 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers 14/07/21 19:49:13 INFO Executor: Serialized size of result for 5 is 596 14/07/21 19:49:13 INFO Executor: Sending result for 5 directly to driver 14/07/21 19:49:13 INFO Executor: Finished task ID 5 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:7 as TID 7 on executor localhost: localhost (PROCESS_LOCAL) 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:7 as 11885 bytes in 0 ms 14/07/21 19:49:13 INFO Executor: Running task ID 7 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 5) 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 5 in 77 ms on localhost (progress: 6/106) 14/07/21 19:49:13 INFO HttpBroadcast: Started reading broadcast variable 0 14/07/21 19:49:13 INFO HttpBroadcast: Started reading broadcast variable 0 14/07/21 19:49:13 ERROR Executor: Exception in task ID 6 java.io.FileNotFoundException: http://172.31.34.174:52070/broadcast_0 at sun.net.www.protocol.http.HttpURLConnection.getInputStream (http://www.protocol.http.HttpURLConnection.getInputStream)(HttpURLConnection.java:1624) at org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:196) at org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:89) at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) at org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:61) at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:141) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:169) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744)
> >  
> > I highlighted the lines indicating the ContextCleaner cleaned the broadcast variable, I’m wondering why the variable is cleaned, since there are enough memory space?  
> >  
> > Best,  
> >  
> > --  
> > Nan Zhu
> >  
> >  
>  
>  
>  


Re: broadcast variable get cleaned by ContextCleaner unexpectedly ?

Posted by Tathagata Das <ta...@gmail.com>.
The ContextCleaner cleans up data and metadata related to RDDs and
broadcast variables, only when those variables are not in scope and get
garbage-collected by the JVM. So if the broadcast variable in question is
probably somehow going out of scope even before the job using the broadcast
variable is in progress.

Could you reproduce this behavior reliably in a simple code snippet that
you can share with us?

TD



On Mon, Jul 21, 2014 at 2:29 PM, Nan Zhu <zh...@gmail.com> wrote:

>  Hi, all
>
> When I run some Spark application (actually unit test of the application in
> Jenkins ), I found that I always hit the FileNotFoundException when
> reading broadcast variable
>
> The program itself works well, except the unit test
>
> Here is the example log:
>
>
> 14/07/21 19:49:13 INFO Executor: Running task ID 4
> 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 2)
> 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 2 in 95 ms on localhost (progress: 3/106)
> 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers
> 14/07/21 19:49:13 INFO Executor: Serialized size of result for 3 is 596
> 14/07/21 19:49:13 INFO Executor: Sending result for 3 directly to driver
> 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally
> 14/07/21 19:49:13 INFO Executor: Finished task ID 3
> 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:5 as TID 5 on executor localhost: localhost (PROCESS_LOCAL)
> 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:5 as 11885 bytes in 0 ms
> 14/07/21 19:49:13 INFO Executor: Running task ID 5
> 14/07/21 19:49:13 INFO BlockManager: Removing broadcast 0
> 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 3)*14/07/21 19:49:13 INFO ContextCleaner: Cleaned broadcast 0*
> 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 3 in 97 ms on localhost (progress: 4/106)
> 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally
> 14/07/21 19:49:13 INFO BlockManager: Removing block broadcast_0*14/07/21 19:49:13 INFO MemoryStore: Block broadcast_0 of size 202564 dropped from memory (free 886623436)*
> 14/07/21 19:49:13 INFO ContextCleaner: Cleaned shuffle 0
> 14/07/21 19:49:13 INFO ShuffleBlockManager: Deleted all files for shuffle 0
> 14/07/21 19:49:13 INFO HadoopRDD: Input split: hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+5
> 14/07/21 <http://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+514/07/21> 19:49:13 INFO HadoopRDD: Input split: hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+5
> 14/07/21 <http://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+514/07/21> 19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers
> 14/07/21 19:49:13 INFO Executor: Serialized size of result for 4 is 596
> 14/07/21 19:49:13 INFO Executor: Sending result for 4 directly to driver
> 14/07/21 19:49:13 INFO Executor: Finished task ID 4
> 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:6 as TID 6 on executor localhost: localhost (PROCESS_LOCAL)
> 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:6 as 11885 bytes in 0 ms
> 14/07/21 19:49:13 INFO Executor: Running task ID 6
> 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 4)
> 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 4 in 80 ms on localhost (progress: 5/106)
> 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers
> 14/07/21 19:49:13 INFO Executor: Serialized size of result for 5 is 596
> 14/07/21 19:49:13 INFO Executor: Sending result for 5 directly to driver
> 14/07/21 19:49:13 INFO Executor: Finished task ID 5
> 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:7 as TID 7 on executor localhost: localhost (PROCESS_LOCAL)
> 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:7 as 11885 bytes in 0 ms
> 14/07/21 19:49:13 INFO Executor: Running task ID 7
> 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 5)
> 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 5 in 77 ms on localhost (progress: 6/106)
> 14/07/21 19:49:13 INFO HttpBroadcast: Started reading broadcast variable 0
> 14/07/21 19:49:13 INFO HttpBroadcast: Started reading broadcast variable 0
> 14/07/21 19:49:13 ERROR Executor: Exception in task ID 6
> java.io.FileNotFoundException: http://172.31.34.174:52070/broadcast_0
> 	at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1624)
> 	at org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:196)
> 	at org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:89)
> 	at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:606)
> 	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> 	at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
> 	at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:606)
> 	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> 	at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
> 	at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:606)
> 	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> 	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
> 	at org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:61)
> 	at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:141)
> 	at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> 	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
> 	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:169)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 	at java.lang.Thread.run(Thread.java:744)
>
>
> I highlighted the lines indicating the ContextCleaner cleaned the broadcast variable, I’m wondering why the variable is cleaned, since there are enough memory space?
>
>
> Best,
>
>
> --
> Nan Zhu
>