You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by abellet <au...@telecom-paristech.fr> on 2015/08/20 11:26:58 UTC

Memory-efficient successive calls to repartition()

Hello,

For the need of my application, I need to periodically "shuffle" the data
across nodes/partitions of a reasonably-large dataset. This is an expensive
operation but I only need to do it every now and then. However it seems that
I am doing something wrong because as the iterations go the memory usage
increases, causing the job to spill onto HDFS, which eventually gets full. I
am also getting some "Lost executor" errors that I don't get if I don't
repartition.

Here's a basic piece of code which reproduces the problem:

data = sc.textFile("ImageNet_gist_train.txt",50).map(parseLine).cache()
data.count()
for i in range(1000):
	data=data.repartition(50).persist()
        # below several operations are done on data


What am I doing wrong? I tried the following but it doesn't solve the issue:

for i in range(1000):
	data2=data.repartition(50).persist()
	data2.count() # materialize rdd
	data.unpersist() # unpersist previous version
	data=data2


Help and suggestions on this would be greatly appreciated! Thanks a lot!




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Memory-efficient-successive-calls-to-repartition-tp24358.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Memory-efficient successive calls to repartition()

Posted by Alexis Gillain <al...@googlemail.com>.
Hi Aurelien,

The first code should create a new RDD in memory at each iteration (check
the webui).
The second code will unpersist the RDD but that's not the main problem.

I think you have trouble due to long lineage as .cache() keep track of
lineage for recovery.
You should have a look at checkpointing :
https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala

You can also have a look at the code of others iterative algorithms in
mlllib for best practices.


2015-08-20 17:26 GMT+08:00 abellet <au...@telecom-paristech.fr>:

> Hello,
>
> For the need of my application, I need to periodically "shuffle" the data
> across nodes/partitions of a reasonably-large dataset. This is an expensive
> operation but I only need to do it every now and then. However it seems
> that
> I am doing something wrong because as the iterations go the memory usage
> increases, causing the job to spill onto HDFS, which eventually gets full.
> I
> am also getting some "Lost executor" errors that I don't get if I don't
> repartition.
>
> Here's a basic piece of code which reproduces the problem:
>
> data = sc.textFile("ImageNet_gist_train.txt",50).map(parseLine).cache()
> data.count()
> for i in range(1000):
>         data=data.repartition(50).persist()
>         # below several operations are done on data
>
>
> What am I doing wrong? I tried the following but it doesn't solve the
> issue:
>
> for i in range(1000):
>         data2=data.repartition(50).persist()
>         data2.count() # materialize rdd
>         data.unpersist() # unpersist previous version
>         data=data2
>
>
> Help and suggestions on this would be greatly appreciated! Thanks a lot!
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Memory-efficient-successive-calls-to-repartition-tp24358.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>


-- 
Alexis GILLAIN

Re: Memory-efficient successive calls to repartition()

Posted by alexis GILLAIN <il...@hotmail.com>.
Hi Aurelien,

The first code should create a new RDD in memory at each iteration (check
the webui).
The second code will unpersist the RDD but that's not the main problem.

I think you have trouble due to long lineage as .cache() keep track of
lineage for recovery.
You should have a look at checkpointing :
https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala

You can also have a look at the code of others iterative algorithms in
mlllib for best practices.

2015-08-20 17:26 GMT+08:00 abellet <au...@telecom-paristech.fr>:

> Hello,
>
> For the need of my application, I need to periodically "shuffle" the data
> across nodes/partitions of a reasonably-large dataset. This is an expensive
> operation but I only need to do it every now and then. However it seems
> that
> I am doing something wrong because as the iterations go the memory usage
> increases, causing the job to spill onto HDFS, which eventually gets full.
> I
> am also getting some "Lost executor" errors that I don't get if I don't
> repartition.
>
> Here's a basic piece of code which reproduces the problem:
>
> data = sc.textFile("ImageNet_gist_train.txt",50).map(parseLine).cache()
> data.count()
> for i in range(1000):
>         data=data.repartition(50).persist()
>         # below several operations are done on data
>
>
> What am I doing wrong? I tried the following but it doesn't solve the
> issue:
>
> for i in range(1000):
>         data2=data.repartition(50).persist()
>         data2.count() # materialize rdd
>         data.unpersist() # unpersist previous version
>         data=data2
>
>
> Help and suggestions on this would be greatly appreciated! Thanks a lot!
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Memory-efficient-successive-calls-to-repartition-tp24358.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: Memory-efficient successive calls to repartition()

Posted by alexis GILLAIN <il...@hotmail.com>.
Aurélien,

>From what you're saying, I can think of a couple of things considering I
don't know what you are doing in the rest of the code :

- There is lot of non hdfs writes, it comes from the rest of your code
and/or repartittion(). Repartition involve a shuffling and creation of
files on disk. I would have said that the problem come from that but I just
checked and checkpoint() is supposed to delete shuffle files :
https://forums.databricks.com/questions/277/how-do-i-avoid-the-no-space-left-on-device-error.html
(looks exactly as your problem so you could maybe try the others
workarounds)
Still, you may do a lot of shuffle in the rest of the code (you should see
the amount of shuffle files written in the webui) and consider increasing
the disk space available...if you can do that.

- On the hdfs side, the class I pointed to has an update function
which "automatically
handles persisting and (optionally) checkpointing, as well as unpersisting
and removing checkpoint files". Not sure your method for checkpointing
remove previous checkpoint file.

In the end, does the disk space error come from hdfs growing or local disk
growing ?

You should check the webui to identify which tasks spill data on disk and
verify if the shuffle files are properly deleted when you checkpoint your
rdd.


Regards,


2015-09-01 22:48 GMT+08:00 Aurélien Bellet <
aurelien.bellet@telecom-paristech.fr>:

> Dear Alexis,
>
> Thanks again for your reply. After reading about checkpointing I have
> modified my sample code as follows:
>
> for i in range(1000):
>     print i
>     data2=data.repartition(50).cache()
>     if (i+1) % 10 == 0:
>         data2.checkpoint()
>     data2.first() # materialize rdd
>     data.unpersist() # unpersist previous version
>     data=data2
>
> The data is checkpointed every 10 iterations to a directory that I
> specified. While this seems to improve things a little bit, there is still
> a lot of writing on disk (appcache directory, shown as "non HDFS files" in
> Cloudera Manager) *besides* the checkpoint files (which are regular HDFS
> files), and the application eventually runs out of disk space. The same is
> true even if I checkpoint at every iteration.
>
> What am I doing wrong? Maybe some garbage collector setting?
>
> Thanks a lot for the help,
>
> Aurelien
>
> Le 24/08/2015 10:39, alexis GILLAIN a écrit :
>
>> Hi Aurelien,
>>
>> The first code should create a new RDD in memory at each iteration
>> (check the webui).
>> The second code will unpersist the RDD but that's not the main problem.
>>
>> I think you have trouble due to long lineage as .cache() keep track of
>> lineage for recovery.
>> You should have a look at checkpointing :
>>
>> https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md
>>
>> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala
>>
>> You can also have a look at the code of others iterative algorithms in
>> mlllib for best practices.
>>
>> 2015-08-20 17:26 GMT+08:00 abellet <aurelien.bellet@telecom-paristech.fr
>> <ma...@telecom-paristech.fr>>:
>>
>>     Hello,
>>
>>     For the need of my application, I need to periodically "shuffle" the
>>     data
>>     across nodes/partitions of a reasonably-large dataset. This is an
>>     expensive
>>     operation but I only need to do it every now and then. However it
>>     seems that
>>     I am doing something wrong because as the iterations go the memory
>> usage
>>     increases, causing the job to spill onto HDFS, which eventually gets
>>     full. I
>>     am also getting some "Lost executor" errors that I don't get if I
>> don't
>>     repartition.
>>
>>     Here's a basic piece of code which reproduces the problem:
>>
>>     data =
>> sc.textFile("ImageNet_gist_train.txt",50).map(parseLine).cache()
>>     data.count()
>>     for i in range(1000):
>>              data=data.repartition(50).persist()
>>              # below several operations are done on data
>>
>>
>>     What am I doing wrong? I tried the following but it doesn't solve
>>     the issue:
>>
>>     for i in range(1000):
>>              data2=data.repartition(50).persist()
>>              data2.count() # materialize rdd
>>              data.unpersist() # unpersist previous version
>>              data=data2
>>
>>
>>     Help and suggestions on this would be greatly appreciated! Thanks a
>> lot!
>>
>>
>>
>>
>>     --
>>     View this message in context:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Memory-efficient-successive-calls-to-repartition-tp24358.html
>>     Sent from the Apache Spark User List mailing list archive at
>> Nabble.com.
>>
>>     ---------------------------------------------------------------------
>>     To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>     <ma...@spark.apache.org>
>>     For additional commands, e-mail: user-help@spark.apache.org
>>     <ma...@spark.apache.org>
>>
>>
>>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: Memory-efficient successive calls to repartition()

Posted by shahid ashraf <sh...@trialx.com>.
Hi Guys

It seems my problems is related to his question as well. i am running
standalone spark 1.4.1 on local machine

i have 10 partitions with data skew on partition 1 and 4 partition: [(0,
0), (*1, 15593259)*, (2, 0), (3, 0), (*4, 20695601)*, (5, 0), (6, 0), (7,
0), (8, 0), (9, 0)] and elements: >>

Now i try to rdd.repartition(10) and getting errors like ERROR Executor:
Exception in task 1.0 in stage 10.0 (TID 61)
java.lang.OutOfMemoryError: Java heap space and* ERROR
DiskBlockObjectWriter  in stack trace*


*I tried to enhance  "spark.executor.memory", "4g" as well getting same
errors again*

15/09/02 21:12:43 ERROR Executor: Exception in task 1.0 in stage 10.0 (TID
61)
java.lang.OutOfMemoryError: Java heap space
at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:113)
at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:101)
at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:97)
at
org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at
org.apache.spark.util.collection.WritablePartitionedIterator$$anon$3.writeNext(WritablePartitionedPairCollection.scala:105)
at
org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:375)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:208)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/09/02 21:12:43 WARN TaskSetManager: Lost task 1.0 in stage 10.0 (TID 61,
localhost): java.lang.OutOfMemoryError: Java heap space
at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:113)
at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:101)
at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:97)
at
org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at
org.apache.spark.util.collection.WritablePartitionedIterator$$anon$3.writeNext(WritablePartitionedPairCollection.scala:105)
at
org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:375)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:208)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

15/09/02 21:12:43 ERROR TaskSetManager: Task 1 in stage 10.0 failed 1
times; aborting job
15/09/02 21:12:43 ERROR SparkUncaughtExceptionHandler: Uncaught exception
in thread Thread[Executor task launch worker-0,5,main]
java.lang.OutOfMemoryError: Java heap space
at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:113)
at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:101)
at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:97)
at
org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at
org.apache.spark.util.collection.WritablePartitionedIterator$$anon$3.writeNext(WritablePartitionedPairCollection.scala:105)
at
org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:375)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:208)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/09/02 21:12:43 INFO SparkContext: Invoking stop() from shutdown hook
15/09/02 21:12:43 INFO TaskSchedulerImpl: Cancelling stage 10
15/09/02 21:12:43 INFO Executor: Executor is trying to kill task 4.0 in
stage 10.0 (TID 64)
15/09/02 21:12:43 INFO TaskSchedulerImpl: Stage 10 was cancelled
15/09/02 21:12:43 INFO DAGScheduler: ShuffleMapStage 10 (repartition at
NativeMethodAccessorImpl.java:-2) failed in 102.132 s
15/09/02 21:12:43 INFO DAGScheduler: Job 4 failed: collect at
/Users/shahid/projects/spark_rl/record_linker_spark.py:74, took 102.154710 s
Traceback (most recent call last):
  File "/Users/shahid/projects/spark_rl/record_linker_spark.py", line 121,
in <module>
15/09/02 21:12:43 INFO SparkUI: Stopped Spark web UI at
http://192.168.1.2:4040
15/09/02 21:12:43 INFO DAGScheduler: Stopping DAGScheduler
15/09/02 21:12:43 INFO MapOutputTrackerMasterEndpoint:
MapOutputTrackerMasterEndpoint stopped!
15/09/02 21:12:43 INFO Utils: path =
/private/var/folders/5f/3v7mdvrn02lcfczhr9my0ljc0000gn/T/spark-79bb5b4f-9be4-47ed-a722-bfc79880622e/blockmgr-e8e82c2e-ca87-4667-a330-b1edb83aa81f,
already present as root for deletion.
15/09/02 21:12:43 INFO MemoryStore: MemoryStore cleared
15/09/02 21:12:43 INFO BlockManager: BlockManager stopped
15/09/02 21:12:43 INFO BlockManagerMaster: BlockManagerMaster stopped
15/09/02 21:12:43 INFO
OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:
OutputCommitCoordinator stopped!
15/09/02 21:12:43 INFO RemoteActorRefProvider$RemotingTerminator: Shutting
down remote daemon.
15/09/02 21:12:43 INFO RemoteActorRefProvider$RemotingTerminator: Remote
daemon shut down; proceeding with flushing remote transports.
15/09/02 21:12:43 WARN PythonRDD: Incomplete task interrupted: Attempting
to kill Python Worker
15/09/02 21:12:43 INFO RemoteActorRefProvider$RemotingTerminator: Remoting
shut down.
    R.print_no_elements(mathes_grp)
  File "/Users/shahid/projects/spark_rl/record_linker_spark.py", line 74,
in print_no_elements
    print "\n* * * * * * * * * * * * * * *\n<< partition: %s and elements:
>> \n* * * * * * * * * * * * * * *\n" %
rdd.mapPartitionsWithIndex(self.index_no_elements).collect()
  File
"/Users/shahid/projects/spark-1.4.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/rdd.py",
line 757, in collect
  File
"/Users/shahid/projects/spark-1.4.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
line 538, in __call__
  File
"/Users/shahid/projects/spark-1.4.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1
in stage 10.0 failed 1 times, most recent failure: Lost task 1.0 in stage
10.0 (TID 61, localhost): java.lang.OutOfMemoryError: Java heap space
at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:113)
at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:101)
at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:97)
at
org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at
org.apache.spark.util.collection.WritablePartitionedIterator$$anon$3.writeNext(WritablePartitionedPairCollection.scala:105)
at
org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:375)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:208)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

15/09/02 21:12:43 INFO SparkContext: Successfully stopped SparkContext
15/09/02 21:12:43 INFO Utils: Shutdown hook called
15/09/02 21:12:43 INFO Utils: Deleting directory
/private/var/folders/5f/3v7mdvrn02lcfczhr9my0ljc0000gn/T/spark-79bb5b4f-9be4-47ed-a722-bfc79880622e/pyspark-090350a4-e0ab-4692-8360-b6be8d64d1fe
15/09/02 21:12:43 INFO Utils: Deleting directory
/private/var/folders/5f/3v7mdvrn02lcfczhr9my0ljc0000gn/T/spark-79bb5b4f-9be4-47ed-a722-bfc79880622e
15/09/02 21:12:45 ERROR DiskBlockObjectWriter: Uncaught exception while
reverting partial writes to file
/private/var/folders/5f/3v7mdvrn02lcfczhr9my0ljc0000gn/T/spark-79bb5b4f-9be4-47ed-a722-bfc79880622e/blockmgr-e8e82c2e-ca87-4667-a330-b1edb83aa81f/05/temp_shuffle_f80f61c5-4fb6-4d46-a81b-239095ebf20d
java.io.FileNotFoundException:
/private/var/folders/5f/3v7mdvrn02lcfczhr9my0ljc0000gn/T/spark-79bb5b4f-9be4-47ed-a722-bfc79880622e/blockmgr-e8e82c2e-ca87-4667-a330-b1edb83aa81f/05/temp_shuffle_f80f61c5-4fb6-4d46-a81b-239095ebf20d
(No such file or directory)
at java.io.FileOutputStream.open0(Native Method)
at java.io.FileOutputStream.open(FileOutputStream.java:270)
at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
at
org.apache.spark.storage.DiskBlockObjectWriter.revertPartialWritesAndClose(BlockObjectWriter.scala:189)
at
org.apache.spark.util.collection.ExternalSorter$$anonfun$stop$2.apply(ExternalSorter.scala:807)
at
org.apache.spark.util.collection.ExternalSorter$$anonfun$stop$2.apply(ExternalSorter.scala:806)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at
org.apache.spark.util.collection.ExternalSorter.stop(ExternalSorter.scala:806)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.stop(SortShuffleWriter.scala:94)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:76)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/09/02 21:12:46 ERROR Utils: Exception while deleting Spark temp dir:
/private/var/folders/5f/3v7mdvrn02lcfczhr9my0ljc0000gn/T/spark-79bb5b4f-9be4-47ed-a722-bfc79880622e
java.io.IOException: Failed to delete:
/private/var/folders/5f/3v7mdvrn02lcfczhr9my0ljc0000gn/T/spark-79bb5b4f-9be4-47ed-a722-bfc79880622e
at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:963)
at
org.apache.spark.util.Utils$$anonfun$1$$anonfun$apply$mcV$sp$5.apply(Utils.scala:204)
at
org.apache.spark.util.Utils$$anonfun$1$$anonfun$apply$mcV$sp$5.apply(Utils.scala:201)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
at org.apache.spark.util.Utils$$anonfun$1.apply$mcV$sp(Utils.scala:201)
at org.apache.spark.util.SparkShutdownHook.run(Utils.scala:2308)
at
org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Utils.scala:2278)
at
org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(Utils.scala:2278)
at
org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(Utils.scala:2278)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1772)
at
org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(Utils.scala:2278)
at
org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(Utils.scala:2278)
at
org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(Utils.scala:2278)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.util.SparkShutdownHookManager.runAll(Utils.scala:2278)
at
org.apache.spark.util.SparkShutdownHookManager$$anon$6.run(Utils.scala:2260)
at
org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)

On Wed, Sep 2, 2015 at 8:41 PM, alexis GILLAIN <il...@hotmail.com> wrote:

> Just made some tests on my laptop.
>
> Deletion of the files is not immediate but a System.gc() call makes the
> job on shuffle files of a checkpointed RDD.
> It should solve your problem (`sc._jvm.System.gc()` in Python as pointed
> in the databricks link in my previous message).
>
>
> 2015-09-02 20:55 GMT+08:00 Aurélien Bellet <
> aurelien.bellet@telecom-paristech.fr>:
>
>> Thanks a lot for the useful link and comments Alexis!
>>
>> First of all, the problem occurs without doing anything else in the code
>> (except of course loading my data from HDFS at the beginning) - so it
>> definitely comes from the shuffling. You're right, in the current version,
>> checkpoint files are not removed and take up some space in HDFS (this is
>> easy to fix). But this is negligible compared to the non hdfs files which
>> keeps growing as iterations go. So I agree with you that this must come
>> from the shuffling operations: it seems that the shuffle files are not
>> removed along the execution (they are only removed if I stop/kill the
>> application), despite the use of checkpoint.
>>
>> The class you mentioned is very interesting but I did not find a way to
>> use it from pyspark. I will try to implement my own version, looking at the
>> source code. But besides the queueing and removing of checkpoint files, I
>> do not really see anything special there that could solve my issue.
>>
>> I will continue to investigate this. Just found out I can use a command
>> line browser to look at the webui (I cannot access the server in graphical
>> display mode), this should help me understand what's going on. I will also
>> try the workarounds mentioned in the link. Keep you posted.
>>
>> Again, thanks a lot!
>>
>> Best,
>>
>> Aurelien
>>
>>
>> Le 02/09/2015 14:15, alexis GILLAIN a écrit :
>>
>>> Aurélien,
>>>
>>>  From what you're saying, I can think of a couple of things considering
>>> I don't know what you are doing in the rest of the code :
>>>
>>> - There is lot of non hdfs writes, it comes from the rest of your code
>>> and/or repartittion(). Repartition involve a shuffling and creation of
>>> files on disk. I would have said that the problem come from that but I
>>> just checked and checkpoint() is supposed to delete shuffle files :
>>>
>>> https://forums.databricks.com/questions/277/how-do-i-avoid-the-no-space-left-on-device-error.html
>>> (looks exactly as your problem so you could maybe try the others
>>> workarounds)
>>> Still, you may do a lot of shuffle in the rest of the code (you should
>>> see the amount of shuffle files written in the webui) and consider
>>> increasing the disk space available...if you can do that.
>>>
>>> - On the hdfs side, the class I pointed to has an update function which
>>> "automatically handles persisting and (optionally) checkpointing, as
>>> well as unpersisting and removing checkpoint files". Not sure your
>>> method for checkpointing remove previous checkpoint file.
>>>
>>> In the end, does the disk space error come from hdfs growing or local
>>> disk growing ?
>>>
>>> You should check the webui to identify which tasks spill data on disk
>>> and verify if the shuffle files are properly deleted when you checkpoint
>>> your rdd.
>>>
>>>
>>> Regards,
>>>
>>>
>>> 2015-09-01 22:48 GMT+08:00 Aurélien Bellet
>>> <aurelien.bellet@telecom-paristech.fr
>>> <ma...@telecom-paristech.fr>>:
>>>
>>>
>>>     Dear Alexis,
>>>
>>>     Thanks again for your reply. After reading about checkpointing I
>>>     have modified my sample code as follows:
>>>
>>>     for i in range(1000):
>>>          print i
>>>          data2=data.repartition(50).cache()
>>>          if (i+1) % 10 == 0:
>>>              data2.checkpoint()
>>>          data2.first() # materialize rdd
>>>          data.unpersist() # unpersist previous version
>>>          data=data2
>>>
>>>     The data is checkpointed every 10 iterations to a directory that I
>>>     specified. While this seems to improve things a little bit, there is
>>>     still a lot of writing on disk (appcache directory, shown as "non
>>>     HDFS files" in Cloudera Manager) *besides* the checkpoint files
>>>     (which are regular HDFS files), and the application eventually runs
>>>     out of disk space. The same is true even if I checkpoint at every
>>>     iteration.
>>>
>>>     What am I doing wrong? Maybe some garbage collector setting?
>>>
>>>     Thanks a lot for the help,
>>>
>>>     Aurelien
>>>
>>>     Le 24/08/2015 10:39, alexis GILLAIN a écrit :
>>>
>>>         Hi Aurelien,
>>>
>>>         The first code should create a new RDD in memory at each
>>> iteration
>>>         (check the webui).
>>>         The second code will unpersist the RDD but that's not the main
>>>         problem.
>>>
>>>         I think you have trouble due to long lineage as .cache() keep
>>>         track of
>>>         lineage for recovery.
>>>         You should have a look at checkpointing :
>>>
>>> https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md
>>>
>>> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala
>>>
>>>         You can also have a look at the code of others iterative
>>>         algorithms in
>>>         mlllib for best practices.
>>>
>>>         2015-08-20 17:26 GMT+08:00 abellet
>>>         <aurelien.bellet@telecom-paristech.fr
>>>         <ma...@telecom-paristech.fr>
>>>         <mailto:aurelien.bellet@telecom-paristech.fr
>>>
>>>         <ma...@telecom-paristech.fr>>>:
>>>
>>>              Hello,
>>>
>>>              For the need of my application, I need to periodically
>>>         "shuffle" the
>>>              data
>>>              across nodes/partitions of a reasonably-large dataset. This
>>>         is an
>>>              expensive
>>>              operation but I only need to do it every now and then.
>>>         However it
>>>              seems that
>>>              I am doing something wrong because as the iterations go the
>>>         memory usage
>>>              increases, causing the job to spill onto HDFS, which
>>>         eventually gets
>>>              full. I
>>>              am also getting some "Lost executor" errors that I don't
>>>         get if I don't
>>>              repartition.
>>>
>>>              Here's a basic piece of code which reproduces the problem:
>>>
>>>              data =
>>>         sc.textFile("ImageNet_gist_train.txt",50).map(parseLine).cache()
>>>              data.count()
>>>              for i in range(1000):
>>>                       data=data.repartition(50).persist()
>>>                       # below several operations are done on data
>>>
>>>
>>>              What am I doing wrong? I tried the following but it doesn't
>>>         solve
>>>              the issue:
>>>
>>>              for i in range(1000):
>>>                       data2=data.repartition(50).persist()
>>>                       data2.count() # materialize rdd
>>>                       data.unpersist() # unpersist previous version
>>>                       data=data2
>>>
>>>
>>>              Help and suggestions on this would be greatly appreciated!
>>>         Thanks a lot!
>>>
>>>
>>>
>>>
>>>              --
>>>              View this message in context:
>>>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Memory-efficient-successive-calls-to-repartition-tp24358.html
>>>              Sent from the Apache Spark User List mailing list archive
>>>         at Nabble.com.
>>>
>>>
>>>
>>> ---------------------------------------------------------------------
>>>              To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>         <ma...@spark.apache.org>
>>>              <mailto:user-unsubscribe@spark.apache.org
>>>         <ma...@spark.apache.org>>
>>>              For additional commands, e-mail: user-help@spark.apache.org
>>>         <ma...@spark.apache.org>
>>>              <mailto:user-help@spark.apache.org
>>>         <ma...@spark.apache.org>>
>>>
>>>
>>>
>>>     ---------------------------------------------------------------------
>>>     To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>     <ma...@spark.apache.org>
>>>     For additional commands, e-mail: user-help@spark.apache.org
>>>     <ma...@spark.apache.org>
>>>
>>>
>>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
>


-- 
with Regards
Shahid Ashraf

Re: Memory-efficient successive calls to repartition()

Posted by alexis GILLAIN <il...@hotmail.com>.
Just made some tests on my laptop.

Deletion of the files is not immediate but a System.gc() call makes the job
on shuffle files of a checkpointed RDD.
It should solve your problem (`sc._jvm.System.gc()` in Python as pointed in
the databricks link in my previous message).


2015-09-02 20:55 GMT+08:00 Aurélien Bellet <
aurelien.bellet@telecom-paristech.fr>:

> Thanks a lot for the useful link and comments Alexis!
>
> First of all, the problem occurs without doing anything else in the code
> (except of course loading my data from HDFS at the beginning) - so it
> definitely comes from the shuffling. You're right, in the current version,
> checkpoint files are not removed and take up some space in HDFS (this is
> easy to fix). But this is negligible compared to the non hdfs files which
> keeps growing as iterations go. So I agree with you that this must come
> from the shuffling operations: it seems that the shuffle files are not
> removed along the execution (they are only removed if I stop/kill the
> application), despite the use of checkpoint.
>
> The class you mentioned is very interesting but I did not find a way to
> use it from pyspark. I will try to implement my own version, looking at the
> source code. But besides the queueing and removing of checkpoint files, I
> do not really see anything special there that could solve my issue.
>
> I will continue to investigate this. Just found out I can use a command
> line browser to look at the webui (I cannot access the server in graphical
> display mode), this should help me understand what's going on. I will also
> try the workarounds mentioned in the link. Keep you posted.
>
> Again, thanks a lot!
>
> Best,
>
> Aurelien
>
>
> Le 02/09/2015 14:15, alexis GILLAIN a écrit :
>
>> Aurélien,
>>
>>  From what you're saying, I can think of a couple of things considering
>> I don't know what you are doing in the rest of the code :
>>
>> - There is lot of non hdfs writes, it comes from the rest of your code
>> and/or repartittion(). Repartition involve a shuffling and creation of
>> files on disk. I would have said that the problem come from that but I
>> just checked and checkpoint() is supposed to delete shuffle files :
>>
>> https://forums.databricks.com/questions/277/how-do-i-avoid-the-no-space-left-on-device-error.html
>> (looks exactly as your problem so you could maybe try the others
>> workarounds)
>> Still, you may do a lot of shuffle in the rest of the code (you should
>> see the amount of shuffle files written in the webui) and consider
>> increasing the disk space available...if you can do that.
>>
>> - On the hdfs side, the class I pointed to has an update function which
>> "automatically handles persisting and (optionally) checkpointing, as
>> well as unpersisting and removing checkpoint files". Not sure your
>> method for checkpointing remove previous checkpoint file.
>>
>> In the end, does the disk space error come from hdfs growing or local
>> disk growing ?
>>
>> You should check the webui to identify which tasks spill data on disk
>> and verify if the shuffle files are properly deleted when you checkpoint
>> your rdd.
>>
>>
>> Regards,
>>
>>
>> 2015-09-01 22:48 GMT+08:00 Aurélien Bellet
>> <aurelien.bellet@telecom-paristech.fr
>> <ma...@telecom-paristech.fr>>:
>>
>>
>>     Dear Alexis,
>>
>>     Thanks again for your reply. After reading about checkpointing I
>>     have modified my sample code as follows:
>>
>>     for i in range(1000):
>>          print i
>>          data2=data.repartition(50).cache()
>>          if (i+1) % 10 == 0:
>>              data2.checkpoint()
>>          data2.first() # materialize rdd
>>          data.unpersist() # unpersist previous version
>>          data=data2
>>
>>     The data is checkpointed every 10 iterations to a directory that I
>>     specified. While this seems to improve things a little bit, there is
>>     still a lot of writing on disk (appcache directory, shown as "non
>>     HDFS files" in Cloudera Manager) *besides* the checkpoint files
>>     (which are regular HDFS files), and the application eventually runs
>>     out of disk space. The same is true even if I checkpoint at every
>>     iteration.
>>
>>     What am I doing wrong? Maybe some garbage collector setting?
>>
>>     Thanks a lot for the help,
>>
>>     Aurelien
>>
>>     Le 24/08/2015 10:39, alexis GILLAIN a écrit :
>>
>>         Hi Aurelien,
>>
>>         The first code should create a new RDD in memory at each iteration
>>         (check the webui).
>>         The second code will unpersist the RDD but that's not the main
>>         problem.
>>
>>         I think you have trouble due to long lineage as .cache() keep
>>         track of
>>         lineage for recovery.
>>         You should have a look at checkpointing :
>>
>> https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md
>>
>> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala
>>
>>         You can also have a look at the code of others iterative
>>         algorithms in
>>         mlllib for best practices.
>>
>>         2015-08-20 17:26 GMT+08:00 abellet
>>         <aurelien.bellet@telecom-paristech.fr
>>         <ma...@telecom-paristech.fr>
>>         <mailto:aurelien.bellet@telecom-paristech.fr
>>
>>         <ma...@telecom-paristech.fr>>>:
>>
>>              Hello,
>>
>>              For the need of my application, I need to periodically
>>         "shuffle" the
>>              data
>>              across nodes/partitions of a reasonably-large dataset. This
>>         is an
>>              expensive
>>              operation but I only need to do it every now and then.
>>         However it
>>              seems that
>>              I am doing something wrong because as the iterations go the
>>         memory usage
>>              increases, causing the job to spill onto HDFS, which
>>         eventually gets
>>              full. I
>>              am also getting some "Lost executor" errors that I don't
>>         get if I don't
>>              repartition.
>>
>>              Here's a basic piece of code which reproduces the problem:
>>
>>              data =
>>         sc.textFile("ImageNet_gist_train.txt",50).map(parseLine).cache()
>>              data.count()
>>              for i in range(1000):
>>                       data=data.repartition(50).persist()
>>                       # below several operations are done on data
>>
>>
>>              What am I doing wrong? I tried the following but it doesn't
>>         solve
>>              the issue:
>>
>>              for i in range(1000):
>>                       data2=data.repartition(50).persist()
>>                       data2.count() # materialize rdd
>>                       data.unpersist() # unpersist previous version
>>                       data=data2
>>
>>
>>              Help and suggestions on this would be greatly appreciated!
>>         Thanks a lot!
>>
>>
>>
>>
>>              --
>>              View this message in context:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Memory-efficient-successive-calls-to-repartition-tp24358.html
>>              Sent from the Apache Spark User List mailing list archive
>>         at Nabble.com.
>>
>>
>>
>> ---------------------------------------------------------------------
>>              To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>         <ma...@spark.apache.org>
>>              <mailto:user-unsubscribe@spark.apache.org
>>         <ma...@spark.apache.org>>
>>              For additional commands, e-mail: user-help@spark.apache.org
>>         <ma...@spark.apache.org>
>>              <mailto:user-help@spark.apache.org
>>         <ma...@spark.apache.org>>
>>
>>
>>
>>     ---------------------------------------------------------------------
>>     To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>     <ma...@spark.apache.org>
>>     For additional commands, e-mail: user-help@spark.apache.org
>>     <ma...@spark.apache.org>
>>
>>
>>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: Memory-efficient successive calls to repartition()

Posted by Aurélien Bellet <au...@telecom-paristech.fr>.
What is strange is that if I remove the if condition (i.e., checkpoint 
at each iteration), then it basically works: non HDFS disk usage remains 
very small and stable throughout the execution.

If instead I checkpoint only every now and then (cf code in my previous 
email), then the disk usage grows regularly throughout the execution 
until no free space is available, despite the call to the GC.

Aurelien

Le 9/8/15 6:22 PM, Aurélien Bellet a écrit :
> Hi,
>
> This is what I tried:
>
> for i in range(1000):
>      print i
>      data2=data.repartition(50).cache()
>      if (i+1) % 10 == 0:
>          data2.checkpoint()
>      data2.first() # materialize rdd
>      data.unpersist() # unpersist previous version
>      sc._jvm.System.gc()
>      data=data2
>
> But unfortunately I do not get any significant improvement from the call
> to sc._jvm.System.gc()...
>
> I checked the WebUI and I have a single RDD in memory, so unpersist()
> works as expected but still no solution to trigger the cleaning of
> shuffle files...
>
> Aurélien
>
> Le 9/2/15 4:11 PM, alexis GILLAIN a écrit :
>> Just made some tests on my laptop.
>>
>> Deletion of the files is not immediate but a System.gc() call makes the
>> job on shuffle files of a checkpointed RDD.
>> It should solve your problem (`sc._jvm.System.gc()` in Python as pointed
>> in the databricks link in my previous message).
>>
>>
>> 2015-09-02 20:55 GMT+08:00 Aurélien Bellet
>> <aurelien.bellet@telecom-paristech.fr
>> <ma...@telecom-paristech.fr>>:
>>
>>     Thanks a lot for the useful link and comments Alexis!
>>
>>     First of all, the problem occurs without doing anything else in the
>>     code (except of course loading my data from HDFS at the beginning) -
>>     so it definitely comes from the shuffling. You're right, in the
>>     current version, checkpoint files are not removed and take up some
>>     space in HDFS (this is easy to fix). But this is negligible compared
>>     to the non hdfs files which keeps growing as iterations go. So I
>>     agree with you that this must come from the shuffling operations: it
>>     seems that the shuffle files are not removed along the execution
>>     (they are only removed if I stop/kill the application), despite the
>>     use of checkpoint.
>>
>>     The class you mentioned is very interesting but I did not find a way
>>     to use it from pyspark. I will try to implement my own version,
>>     looking at the source code. But besides the queueing and removing of
>>     checkpoint files, I do not really see anything special there that
>>     could solve my issue.
>>
>>     I will continue to investigate this. Just found out I can use a
>>     command line browser to look at the webui (I cannot access the
>>     server in graphical display mode), this should help me understand
>>     what's going on. I will also try the workarounds mentioned in the
>>     link. Keep you posted.
>>
>>     Again, thanks a lot!
>>
>>     Best,
>>
>>     Aurelien
>>
>>
>>     Le 02/09/2015 14:15, alexis GILLAIN a écrit :
>>
>>         Aurélien,
>>
>>           From what you're saying, I can think of a couple of things
>>         considering
>>         I don't know what you are doing in the rest of the code :
>>
>>         - There is lot of non hdfs writes, it comes from the rest of
>>         your code
>>         and/or repartittion(). Repartition involve a shuffling and
>>         creation of
>>         files on disk. I would have said that the problem come from that
>>         but I
>>         just checked and checkpoint() is supposed to delete shuffle
>> files :
>>
>> https://forums.databricks.com/questions/277/how-do-i-avoid-the-no-space-left-on-device-error.html
>>
>>         (looks exactly as your problem so you could maybe try the others
>>         workarounds)
>>         Still, you may do a lot of shuffle in the rest of the code (you
>>         should
>>         see the amount of shuffle files written in the webui) and
>> consider
>>         increasing the disk space available...if you can do that.
>>
>>         - On the hdfs side, the class I pointed to has an update
>>         function which
>>         "automatically handles persisting and (optionally)
>> checkpointing, as
>>         well as unpersisting and removing checkpoint files". Not sure
>> your
>>         method for checkpointing remove previous checkpoint file.
>>
>>         In the end, does the disk space error come from hdfs growing or
>>         local
>>         disk growing ?
>>
>>         You should check the webui to identify which tasks spill data on
>>         disk
>>         and verify if the shuffle files are properly deleted when you
>>         checkpoint
>>         your rdd.
>>
>>
>>         Regards,
>>
>>
>>         2015-09-01 22:48 GMT+08:00 Aurélien Bellet
>>         <aurelien.bellet@telecom-paristech.fr
>>         <ma...@telecom-paristech.fr>
>>         <mailto:aurelien.bellet@telecom-paristech.fr
>>         <ma...@telecom-paristech.fr>>>:
>>
>>
>>              Dear Alexis,
>>
>>              Thanks again for your reply. After reading about
>>         checkpointing I
>>              have modified my sample code as follows:
>>
>>              for i in range(1000):
>>                   print i
>>                   data2=data.repartition(50).cache()
>>                   if (i+1) % 10 == 0:
>>                       data2.checkpoint()
>>                   data2.first() # materialize rdd
>>                   data.unpersist() # unpersist previous version
>>                   data=data2
>>
>>              The data is checkpointed every 10 iterations to a directory
>>         that I
>>              specified. While this seems to improve things a little bit,
>>         there is
>>              still a lot of writing on disk (appcache directory, shown
>>         as "non
>>              HDFS files" in Cloudera Manager) *besides* the checkpoint
>> files
>>              (which are regular HDFS files), and the application
>>         eventually runs
>>              out of disk space. The same is true even if I checkpoint at
>>         every
>>              iteration.
>>
>>              What am I doing wrong? Maybe some garbage collector setting?
>>
>>              Thanks a lot for the help,
>>
>>              Aurelien
>>
>>              Le 24/08/2015 10:39, alexis GILLAIN a écrit :
>>
>>                  Hi Aurelien,
>>
>>                  The first code should create a new RDD in memory at
>>         each iteration
>>                  (check the webui).
>>                  The second code will unpersist the RDD but that's not
>>         the main
>>                  problem.
>>
>>                  I think you have trouble due to long lineage as
>>         .cache() keep
>>                  track of
>>                  lineage for recovery.
>>                  You should have a look at checkpointing :
>>
>> https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md
>>
>>
>> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala
>>
>>
>>                  You can also have a look at the code of others iterative
>>                  algorithms in
>>                  mlllib for best practices.
>>
>>                  2015-08-20 17:26 GMT+08:00 abellet
>>                  <aurelien.bellet@telecom-paristech.fr
>>         <ma...@telecom-paristech.fr>
>>                  <mailto:aurelien.bellet@telecom-paristech.fr
>>         <ma...@telecom-paristech.fr>>
>>                  <mailto:aurelien.bellet@telecom-paristech.fr
>>         <ma...@telecom-paristech.fr>
>>
>>                  <mailto:aurelien.bellet@telecom-paristech.fr
>>         <ma...@telecom-paristech.fr>>>>:
>>
>>                       Hello,
>>
>>                       For the need of my application, I need to
>> periodically
>>                  "shuffle" the
>>                       data
>>                       across nodes/partitions of a reasonably-large
>>         dataset. This
>>                  is an
>>                       expensive
>>                       operation but I only need to do it every now and
>> then.
>>                  However it
>>                       seems that
>>                       I am doing something wrong because as the
>>         iterations go the
>>                  memory usage
>>                       increases, causing the job to spill onto HDFS,
>> which
>>                  eventually gets
>>                       full. I
>>                       am also getting some "Lost executor" errors that I
>>         don't
>>                  get if I don't
>>                       repartition.
>>
>>                       Here's a basic piece of code which reproduces the
>>         problem:
>>
>>                       data =
>>
>>         sc.textFile("ImageNet_gist_train.txt",50).map(parseLine).cache()
>>                       data.count()
>>                       for i in range(1000):
>>                                data=data.repartition(50).persist()
>>                                # below several operations are done on
>> data
>>
>>
>>                       What am I doing wrong? I tried the following but
>>         it doesn't
>>                  solve
>>                       the issue:
>>
>>                       for i in range(1000):
>>                                data2=data.repartition(50).persist()
>>                                data2.count() # materialize rdd
>>                                data.unpersist() # unpersist previous
>> version
>>                                data=data2
>>
>>
>>                       Help and suggestions on this would be greatly
>>         appreciated!
>>                  Thanks a lot!
>>
>>
>>
>>
>>                       --
>>                       View this message in context:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Memory-efficient-successive-calls-to-repartition-tp24358.html
>>
>>                       Sent from the Apache Spark User List mailing list
>>         archive
>>                  at Nabble.com.
>>
>>
>>
>>
>> ---------------------------------------------------------------------
>>                       To unsubscribe, e-mail:
>>         user-unsubscribe@spark.apache.org
>>         <ma...@spark.apache.org>
>>                  <mailto:user-unsubscribe@spark.apache.org
>>         <ma...@spark.apache.org>>
>>                       <mailto:user-unsubscribe@spark.apache.org
>>         <ma...@spark.apache.org>
>>                  <mailto:user-unsubscribe@spark.apache.org
>>         <ma...@spark.apache.org>>>
>>                       For additional commands, e-mail:
>>         user-help@spark.apache.org <ma...@spark.apache.org>
>>                  <mailto:user-help@spark.apache.org
>>         <ma...@spark.apache.org>>
>>                       <mailto:user-help@spark.apache.org
>>         <ma...@spark.apache.org>
>>                  <mailto:user-help@spark.apache.org
>>         <ma...@spark.apache.org>>>
>>
>>
>>
>>
>>
>> ---------------------------------------------------------------------
>>              To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>         <ma...@spark.apache.org>
>>              <mailto:user-unsubscribe@spark.apache.org
>>         <ma...@spark.apache.org>>
>>              For additional commands, e-mail: user-help@spark.apache.org
>>         <ma...@spark.apache.org>
>>              <mailto:user-help@spark.apache.org
>>         <ma...@spark.apache.org>>
>>
>>
>>
>>     ---------------------------------------------------------------------
>>     To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>     <ma...@spark.apache.org>
>>     For additional commands, e-mail: user-help@spark.apache.org
>>     <ma...@spark.apache.org>
>>
>>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Memory-efficient successive calls to repartition()

Posted by Aurélien Bellet <au...@telecom-paristech.fr>.
Hi,

This is what I tried:

for i in range(1000):
     print i
     data2=data.repartition(50).cache()
     if (i+1) % 10 == 0:
         data2.checkpoint()
     data2.first() # materialize rdd
     data.unpersist() # unpersist previous version
     sc._jvm.System.gc()
     data=data2

But unfortunately I do not get any significant improvement from the call 
to sc._jvm.System.gc()...

I checked the WebUI and I have a single RDD in memory, so unpersist() 
works as expected but still no solution to trigger the cleaning of 
shuffle files...

Aurélien

Le 9/2/15 4:11 PM, alexis GILLAIN a écrit :
> Just made some tests on my laptop.
>
> Deletion of the files is not immediate but a System.gc() call makes the
> job on shuffle files of a checkpointed RDD.
> It should solve your problem (`sc._jvm.System.gc()` in Python as pointed
> in the databricks link in my previous message).
>
>
> 2015-09-02 20:55 GMT+08:00 Aurélien Bellet
> <aurelien.bellet@telecom-paristech.fr
> <ma...@telecom-paristech.fr>>:
>
>     Thanks a lot for the useful link and comments Alexis!
>
>     First of all, the problem occurs without doing anything else in the
>     code (except of course loading my data from HDFS at the beginning) -
>     so it definitely comes from the shuffling. You're right, in the
>     current version, checkpoint files are not removed and take up some
>     space in HDFS (this is easy to fix). But this is negligible compared
>     to the non hdfs files which keeps growing as iterations go. So I
>     agree with you that this must come from the shuffling operations: it
>     seems that the shuffle files are not removed along the execution
>     (they are only removed if I stop/kill the application), despite the
>     use of checkpoint.
>
>     The class you mentioned is very interesting but I did not find a way
>     to use it from pyspark. I will try to implement my own version,
>     looking at the source code. But besides the queueing and removing of
>     checkpoint files, I do not really see anything special there that
>     could solve my issue.
>
>     I will continue to investigate this. Just found out I can use a
>     command line browser to look at the webui (I cannot access the
>     server in graphical display mode), this should help me understand
>     what's going on. I will also try the workarounds mentioned in the
>     link. Keep you posted.
>
>     Again, thanks a lot!
>
>     Best,
>
>     Aurelien
>
>
>     Le 02/09/2015 14:15, alexis GILLAIN a écrit :
>
>         Aurélien,
>
>           From what you're saying, I can think of a couple of things
>         considering
>         I don't know what you are doing in the rest of the code :
>
>         - There is lot of non hdfs writes, it comes from the rest of
>         your code
>         and/or repartittion(). Repartition involve a shuffling and
>         creation of
>         files on disk. I would have said that the problem come from that
>         but I
>         just checked and checkpoint() is supposed to delete shuffle files :
>         https://forums.databricks.com/questions/277/how-do-i-avoid-the-no-space-left-on-device-error.html
>         (looks exactly as your problem so you could maybe try the others
>         workarounds)
>         Still, you may do a lot of shuffle in the rest of the code (you
>         should
>         see the amount of shuffle files written in the webui) and consider
>         increasing the disk space available...if you can do that.
>
>         - On the hdfs side, the class I pointed to has an update
>         function which
>         "automatically handles persisting and (optionally) checkpointing, as
>         well as unpersisting and removing checkpoint files". Not sure your
>         method for checkpointing remove previous checkpoint file.
>
>         In the end, does the disk space error come from hdfs growing or
>         local
>         disk growing ?
>
>         You should check the webui to identify which tasks spill data on
>         disk
>         and verify if the shuffle files are properly deleted when you
>         checkpoint
>         your rdd.
>
>
>         Regards,
>
>
>         2015-09-01 22:48 GMT+08:00 Aurélien Bellet
>         <aurelien.bellet@telecom-paristech.fr
>         <ma...@telecom-paristech.fr>
>         <mailto:aurelien.bellet@telecom-paristech.fr
>         <ma...@telecom-paristech.fr>>>:
>
>
>              Dear Alexis,
>
>              Thanks again for your reply. After reading about
>         checkpointing I
>              have modified my sample code as follows:
>
>              for i in range(1000):
>                   print i
>                   data2=data.repartition(50).cache()
>                   if (i+1) % 10 == 0:
>                       data2.checkpoint()
>                   data2.first() # materialize rdd
>                   data.unpersist() # unpersist previous version
>                   data=data2
>
>              The data is checkpointed every 10 iterations to a directory
>         that I
>              specified. While this seems to improve things a little bit,
>         there is
>              still a lot of writing on disk (appcache directory, shown
>         as "non
>              HDFS files" in Cloudera Manager) *besides* the checkpoint files
>              (which are regular HDFS files), and the application
>         eventually runs
>              out of disk space. The same is true even if I checkpoint at
>         every
>              iteration.
>
>              What am I doing wrong? Maybe some garbage collector setting?
>
>              Thanks a lot for the help,
>
>              Aurelien
>
>              Le 24/08/2015 10:39, alexis GILLAIN a écrit :
>
>                  Hi Aurelien,
>
>                  The first code should create a new RDD in memory at
>         each iteration
>                  (check the webui).
>                  The second code will unpersist the RDD but that's not
>         the main
>                  problem.
>
>                  I think you have trouble due to long lineage as
>         .cache() keep
>                  track of
>                  lineage for recovery.
>                  You should have a look at checkpointing :
>         https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md
>         https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala
>
>                  You can also have a look at the code of others iterative
>                  algorithms in
>                  mlllib for best practices.
>
>                  2015-08-20 17:26 GMT+08:00 abellet
>                  <aurelien.bellet@telecom-paristech.fr
>         <ma...@telecom-paristech.fr>
>                  <mailto:aurelien.bellet@telecom-paristech.fr
>         <ma...@telecom-paristech.fr>>
>                  <mailto:aurelien.bellet@telecom-paristech.fr
>         <ma...@telecom-paristech.fr>
>
>                  <mailto:aurelien.bellet@telecom-paristech.fr
>         <ma...@telecom-paristech.fr>>>>:
>
>                       Hello,
>
>                       For the need of my application, I need to periodically
>                  "shuffle" the
>                       data
>                       across nodes/partitions of a reasonably-large
>         dataset. This
>                  is an
>                       expensive
>                       operation but I only need to do it every now and then.
>                  However it
>                       seems that
>                       I am doing something wrong because as the
>         iterations go the
>                  memory usage
>                       increases, causing the job to spill onto HDFS, which
>                  eventually gets
>                       full. I
>                       am also getting some "Lost executor" errors that I
>         don't
>                  get if I don't
>                       repartition.
>
>                       Here's a basic piece of code which reproduces the
>         problem:
>
>                       data =
>
>         sc.textFile("ImageNet_gist_train.txt",50).map(parseLine).cache()
>                       data.count()
>                       for i in range(1000):
>                                data=data.repartition(50).persist()
>                                # below several operations are done on data
>
>
>                       What am I doing wrong? I tried the following but
>         it doesn't
>                  solve
>                       the issue:
>
>                       for i in range(1000):
>                                data2=data.repartition(50).persist()
>                                data2.count() # materialize rdd
>                                data.unpersist() # unpersist previous version
>                                data=data2
>
>
>                       Help and suggestions on this would be greatly
>         appreciated!
>                  Thanks a lot!
>
>
>
>
>                       --
>                       View this message in context:
>         http://apache-spark-user-list.1001560.n3.nabble.com/Memory-efficient-successive-calls-to-repartition-tp24358.html
>                       Sent from the Apache Spark User List mailing list
>         archive
>                  at Nabble.com.
>
>
>
>         ---------------------------------------------------------------------
>                       To unsubscribe, e-mail:
>         user-unsubscribe@spark.apache.org
>         <ma...@spark.apache.org>
>                  <mailto:user-unsubscribe@spark.apache.org
>         <ma...@spark.apache.org>>
>                       <mailto:user-unsubscribe@spark.apache.org
>         <ma...@spark.apache.org>
>                  <mailto:user-unsubscribe@spark.apache.org
>         <ma...@spark.apache.org>>>
>                       For additional commands, e-mail:
>         user-help@spark.apache.org <ma...@spark.apache.org>
>                  <mailto:user-help@spark.apache.org
>         <ma...@spark.apache.org>>
>                       <mailto:user-help@spark.apache.org
>         <ma...@spark.apache.org>
>                  <mailto:user-help@spark.apache.org
>         <ma...@spark.apache.org>>>
>
>
>
>
>         ---------------------------------------------------------------------
>              To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>         <ma...@spark.apache.org>
>              <mailto:user-unsubscribe@spark.apache.org
>         <ma...@spark.apache.org>>
>              For additional commands, e-mail: user-help@spark.apache.org
>         <ma...@spark.apache.org>
>              <mailto:user-help@spark.apache.org
>         <ma...@spark.apache.org>>
>
>
>
>     ---------------------------------------------------------------------
>     To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>     <ma...@spark.apache.org>
>     For additional commands, e-mail: user-help@spark.apache.org
>     <ma...@spark.apache.org>
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Memory-efficient successive calls to repartition()

Posted by Aurélien Bellet <au...@telecom-paristech.fr>.
Thanks a lot for the useful link and comments Alexis!

First of all, the problem occurs without doing anything else in the code 
(except of course loading my data from HDFS at the beginning) - so it 
definitely comes from the shuffling. You're right, in the current 
version, checkpoint files are not removed and take up some space in HDFS 
(this is easy to fix). But this is negligible compared to the non hdfs 
files which keeps growing as iterations go. So I agree with you that 
this must come from the shuffling operations: it seems that the shuffle 
files are not removed along the execution (they are only removed if I 
stop/kill the application), despite the use of checkpoint.

The class you mentioned is very interesting but I did not find a way to 
use it from pyspark. I will try to implement my own version, looking at 
the source code. But besides the queueing and removing of checkpoint 
files, I do not really see anything special there that could solve my issue.

I will continue to investigate this. Just found out I can use a command 
line browser to look at the webui (I cannot access the server in 
graphical display mode), this should help me understand what's going on. 
I will also try the workarounds mentioned in the link. Keep you posted.

Again, thanks a lot!

Best,

Aurelien

Le 02/09/2015 14:15, alexis GILLAIN a écrit :
> Aurélien,
>
>  From what you're saying, I can think of a couple of things considering
> I don't know what you are doing in the rest of the code :
>
> - There is lot of non hdfs writes, it comes from the rest of your code
> and/or repartittion(). Repartition involve a shuffling and creation of
> files on disk. I would have said that the problem come from that but I
> just checked and checkpoint() is supposed to delete shuffle files :
> https://forums.databricks.com/questions/277/how-do-i-avoid-the-no-space-left-on-device-error.html
> (looks exactly as your problem so you could maybe try the others
> workarounds)
> Still, you may do a lot of shuffle in the rest of the code (you should
> see the amount of shuffle files written in the webui) and consider
> increasing the disk space available...if you can do that.
>
> - On the hdfs side, the class I pointed to has an update function which
> "automatically handles persisting and (optionally) checkpointing, as
> well as unpersisting and removing checkpoint files". Not sure your
> method for checkpointing remove previous checkpoint file.
>
> In the end, does the disk space error come from hdfs growing or local
> disk growing ?
>
> You should check the webui to identify which tasks spill data on disk
> and verify if the shuffle files are properly deleted when you checkpoint
> your rdd.
>
>
> Regards,
>
>
> 2015-09-01 22:48 GMT+08:00 Aurélien Bellet
> <aurelien.bellet@telecom-paristech.fr
> <ma...@telecom-paristech.fr>>:
>
>     Dear Alexis,
>
>     Thanks again for your reply. After reading about checkpointing I
>     have modified my sample code as follows:
>
>     for i in range(1000):
>          print i
>          data2=data.repartition(50).cache()
>          if (i+1) % 10 == 0:
>              data2.checkpoint()
>          data2.first() # materialize rdd
>          data.unpersist() # unpersist previous version
>          data=data2
>
>     The data is checkpointed every 10 iterations to a directory that I
>     specified. While this seems to improve things a little bit, there is
>     still a lot of writing on disk (appcache directory, shown as "non
>     HDFS files" in Cloudera Manager) *besides* the checkpoint files
>     (which are regular HDFS files), and the application eventually runs
>     out of disk space. The same is true even if I checkpoint at every
>     iteration.
>
>     What am I doing wrong? Maybe some garbage collector setting?
>
>     Thanks a lot for the help,
>
>     Aurelien
>
>     Le 24/08/2015 10:39, alexis GILLAIN a écrit :
>
>         Hi Aurelien,
>
>         The first code should create a new RDD in memory at each iteration
>         (check the webui).
>         The second code will unpersist the RDD but that's not the main
>         problem.
>
>         I think you have trouble due to long lineage as .cache() keep
>         track of
>         lineage for recovery.
>         You should have a look at checkpointing :
>         https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md
>         https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala
>
>         You can also have a look at the code of others iterative
>         algorithms in
>         mlllib for best practices.
>
>         2015-08-20 17:26 GMT+08:00 abellet
>         <aurelien.bellet@telecom-paristech.fr
>         <ma...@telecom-paristech.fr>
>         <mailto:aurelien.bellet@telecom-paristech.fr
>         <ma...@telecom-paristech.fr>>>:
>
>              Hello,
>
>              For the need of my application, I need to periodically
>         "shuffle" the
>              data
>              across nodes/partitions of a reasonably-large dataset. This
>         is an
>              expensive
>              operation but I only need to do it every now and then.
>         However it
>              seems that
>              I am doing something wrong because as the iterations go the
>         memory usage
>              increases, causing the job to spill onto HDFS, which
>         eventually gets
>              full. I
>              am also getting some "Lost executor" errors that I don't
>         get if I don't
>              repartition.
>
>              Here's a basic piece of code which reproduces the problem:
>
>              data =
>         sc.textFile("ImageNet_gist_train.txt",50).map(parseLine).cache()
>              data.count()
>              for i in range(1000):
>                       data=data.repartition(50).persist()
>                       # below several operations are done on data
>
>
>              What am I doing wrong? I tried the following but it doesn't
>         solve
>              the issue:
>
>              for i in range(1000):
>                       data2=data.repartition(50).persist()
>                       data2.count() # materialize rdd
>                       data.unpersist() # unpersist previous version
>                       data=data2
>
>
>              Help and suggestions on this would be greatly appreciated!
>         Thanks a lot!
>
>
>
>
>              --
>              View this message in context:
>         http://apache-spark-user-list.1001560.n3.nabble.com/Memory-efficient-successive-calls-to-repartition-tp24358.html
>              Sent from the Apache Spark User List mailing list archive
>         at Nabble.com.
>
>
>         ---------------------------------------------------------------------
>              To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>         <ma...@spark.apache.org>
>              <mailto:user-unsubscribe@spark.apache.org
>         <ma...@spark.apache.org>>
>              For additional commands, e-mail: user-help@spark.apache.org
>         <ma...@spark.apache.org>
>              <mailto:user-help@spark.apache.org
>         <ma...@spark.apache.org>>
>
>
>
>     ---------------------------------------------------------------------
>     To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>     <ma...@spark.apache.org>
>     For additional commands, e-mail: user-help@spark.apache.org
>     <ma...@spark.apache.org>
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Memory-efficient successive calls to repartition()

Posted by Aurélien Bellet <au...@telecom-paristech.fr>.
Dear Alexis,

Thanks again for your reply. After reading about checkpointing I have 
modified my sample code as follows:

for i in range(1000):
     print i
     data2=data.repartition(50).cache()
     if (i+1) % 10 == 0:
         data2.checkpoint()
     data2.first() # materialize rdd
     data.unpersist() # unpersist previous version
     data=data2

The data is checkpointed every 10 iterations to a directory that I 
specified. While this seems to improve things a little bit, there is 
still a lot of writing on disk (appcache directory, shown as "non HDFS 
files" in Cloudera Manager) *besides* the checkpoint files (which are 
regular HDFS files), and the application eventually runs out of disk 
space. The same is true even if I checkpoint at every iteration.

What am I doing wrong? Maybe some garbage collector setting?

Thanks a lot for the help,

Aurelien

Le 24/08/2015 10:39, alexis GILLAIN a écrit :
> Hi Aurelien,
>
> The first code should create a new RDD in memory at each iteration
> (check the webui).
> The second code will unpersist the RDD but that's not the main problem.
>
> I think you have trouble due to long lineage as .cache() keep track of
> lineage for recovery.
> You should have a look at checkpointing :
> https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala
>
> You can also have a look at the code of others iterative algorithms in
> mlllib for best practices.
>
> 2015-08-20 17:26 GMT+08:00 abellet <aurelien.bellet@telecom-paristech.fr
> <ma...@telecom-paristech.fr>>:
>
>     Hello,
>
>     For the need of my application, I need to periodically "shuffle" the
>     data
>     across nodes/partitions of a reasonably-large dataset. This is an
>     expensive
>     operation but I only need to do it every now and then. However it
>     seems that
>     I am doing something wrong because as the iterations go the memory usage
>     increases, causing the job to spill onto HDFS, which eventually gets
>     full. I
>     am also getting some "Lost executor" errors that I don't get if I don't
>     repartition.
>
>     Here's a basic piece of code which reproduces the problem:
>
>     data = sc.textFile("ImageNet_gist_train.txt",50).map(parseLine).cache()
>     data.count()
>     for i in range(1000):
>              data=data.repartition(50).persist()
>              # below several operations are done on data
>
>
>     What am I doing wrong? I tried the following but it doesn't solve
>     the issue:
>
>     for i in range(1000):
>              data2=data.repartition(50).persist()
>              data2.count() # materialize rdd
>              data.unpersist() # unpersist previous version
>              data=data2
>
>
>     Help and suggestions on this would be greatly appreciated! Thanks a lot!
>
>
>
>
>     --
>     View this message in context:
>     http://apache-spark-user-list.1001560.n3.nabble.com/Memory-efficient-successive-calls-to-repartition-tp24358.html
>     Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>     ---------------------------------------------------------------------
>     To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>     <ma...@spark.apache.org>
>     For additional commands, e-mail: user-help@spark.apache.org
>     <ma...@spark.apache.org>
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org