You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Kendall Wagner <ke...@gmail.com> on 2021/11/28 02:35:37 UTC
Question on RDD storage
Hello,
Sorry I am a spark newbie.
In pyspark session, I want to store the RDD so that next time I run pyspark
again, the RDD will be reloaded.
I tried this:
>>> fruit.count()
1000
>>> fruit.take(5)
[('peach', 1), ('apricot', 2), ('apple', 3), ('haw', 1), ('persimmon', 9)]
>>> fruit.persist(StorageLevel.DISK_ONLY)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
NameError: name 'StorageLevel' is not defined
RDD.persist method seems not working for me.
How to store a RDD to disk and how can I reload it again?
Thank you in advance.
Kendall
Re: Question on RDD storage
Posted by Sean Owen <sr...@gmail.com>.
Please read the docs - there is also saveAsObjectFile, for example, but you
almost surely want to handle this as a DataFrame. You can
.write.format("...") as desired.
On Sun, Nov 28, 2021 at 3:58 PM Kendall Wagner <ke...@gmail.com>
wrote:
> Thanks Mich
> As you show, after reading back from textFile the int becomes str. I need
> another map to translate them?
>
> Regards
> Kendall
>
> Hi,
>>
>>
>> In Pyspark you can persist storage of a Dataframe (df) to disk by using
>> the following command
>>
>>
>> df.persist(pyspark.StorageLevel.DISK_ONLY)
>>
>>
>> note pyspark.Storagelevel above
>>
>>
>> But that only stores the dataframe df to a temporary storage (work area)
>> for spark akin to using the swap area on a Linux host. The
>> temporary storage will disappear as soon as your spark session ends! Thus
>> it is not persistent.
>>
>>
>> Spark like my other tools uses persistent files (a normal file, an HDFS
>> file etc) for storage. you can also write to a database table. In that case
>> you should be able to
>>
>> read that file later.
>>
>>
>> A simple example will show
>>
>>
>> import pyspark
>>
>> from pyspark.sql import SparkSession
>>
>> spark = SparkSession.builder.appName('example').getOrCreate()
>>
>> sc = spark.sparkContext
>>
>> rdd = sc.parallelize(range(1,10))
>>
>> file_path = "file:///tmp/abcd.txt"
>>
>> >>> rdd.getNumPartitions()
>>
>> 6
>>
>> # save it as a textfile in /tmp directory on linux. Use coalesce(1) to reduce
>> the number of partitions to one and save the file, check the docs
>>
>> rdd.coalesce(1).saveAsTextFile(file_path)
>>
>> # read that saved file
>>
>> content = sc.textFile(file_path)
>>
>> >>> content.collect()
>>
>> ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
>>
>> That file file_path is persistent and will stay there in /tmp directory
>>
>>
>> HTH
>>
>>
>>
>>
>> view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sun, 28 Nov 2021 at 03:13, Kendall Wagner <ke...@gmail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> Sorry I am a spark newbie.
>>> In pyspark session, I want to store the RDD so that next time I run
>>> pyspark again, the RDD will be reloaded.
>>>
>>> I tried this:
>>>
>>> >>> fruit.count()
>>> 1000
>>>
>>> >>> fruit.take(5)
>>> [('peach', 1), ('apricot', 2), ('apple', 3), ('haw', 1), ('persimmon',
>>> 9)]
>>>
>>> >>> fruit.persist(StorageLevel.DISK_ONLY)
>>> Traceback (most recent call last):
>>> File "<stdin>", line 1, in <module>
>>> NameError: name 'StorageLevel' is not defined
>>>
>>>
>>> RDD.persist method seems not working for me.
>>> How to store a RDD to disk and how can I reload it again?
>>>
>>>
>>> Thank you in advance.
>>> Kendall
>>>
>>>
>>>
Re: Question on RDD storage
Posted by Mich Talebzadeh <mi...@gmail.com>.
Forgot
You need to import below
from pyspark.sql.types import *
Also you can get the history of commands in Python using below
import readline
for i in range(readline.get_current_history_length()):
print (readline.get_history_item(i + 1))
HTH
view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.
On Sun, 28 Nov 2021 at 22:56, Mich Talebzadeh <mi...@gmail.com>
wrote:
> Hi Kendal,
>
> We had the following before
>
> # read that saved file
>
> content = sc.textFile(file_path)
>
> >>> content.collect()
>
> ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
>
> Let us define a schema for this first list first
>
> Schema = StructType([ StructField("ID", IntegerType(), False)])
>
> Next read this from the saved file together with the schema. The easiest
> solution would be to read the data as a dataframe (pointed out by Sean)
> from the saved file in the beginning:
>
> content = spark.read.csv(file_path,schema = Schema)
> content.printSchema()
> root
> |-- ID: integer (nullable = true)
>
> >>> content.show()
> +---+
> | ID|
> +---+
> | 0|
> | 1|
> | 2|
> | 3|
> | 4|
> | 5|
> | 6|
> | 7|
> | 8|
> | 9|
> +---+
>
> HTH
>
>
>
> view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 28 Nov 2021 at 21:58, Kendall Wagner <ke...@gmail.com>
> wrote:
>
>> Thanks Mich
>> As you show, after reading back from textFile the int becomes str. I need
>> another map to translate them?
>>
>> Regards
>> Kendall
>>
>> Hi,
>>>
>>>
>>> In Pyspark you can persist storage of a Dataframe (df) to disk by using
>>> the following command
>>>
>>>
>>> df.persist(pyspark.StorageLevel.DISK_ONLY)
>>>
>>>
>>> note pyspark.Storagelevel above
>>>
>>>
>>> But that only stores the dataframe df to a temporary storage (work area)
>>> for spark akin to using the swap area on a Linux host. The
>>> temporary storage will disappear as soon as your spark session ends! Thus
>>> it is not persistent.
>>>
>>>
>>> Spark like my other tools uses persistent files (a normal file, an HDFS
>>> file etc) for storage. you can also write to a database table. In that case
>>> you should be able to
>>>
>>> read that file later.
>>>
>>>
>>> A simple example will show
>>>
>>>
>>> import pyspark
>>>
>>> from pyspark.sql import SparkSession
>>>
>>> spark = SparkSession.builder.appName('example').getOrCreate()
>>>
>>> sc = spark.sparkContext
>>>
>>> rdd = sc.parallelize(range(1,10))
>>>
>>> file_path = "file:///tmp/abcd.txt"
>>>
>>> >>> rdd.getNumPartitions()
>>>
>>> 6
>>>
>>> # save it as a textfile in /tmp directory on linux. Use coalesce(1) to reduce
>>> the number of partitions to one and save the file, check the docs
>>>
>>> rdd.coalesce(1).saveAsTextFile(file_path)
>>>
>>> # read that saved file
>>>
>>> content = sc.textFile(file_path)
>>>
>>> >>> content.collect()
>>>
>>> ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
>>>
>>> That file file_path is persistent and will stay there in /tmp directory
>>>
>>>
>>> HTH
>>>
>>>
>>>
>>>
>>> view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Sun, 28 Nov 2021 at 03:13, Kendall Wagner <ke...@gmail.com>
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> Sorry I am a spark newbie.
>>>> In pyspark session, I want to store the RDD so that next time I run
>>>> pyspark again, the RDD will be reloaded.
>>>>
>>>> I tried this:
>>>>
>>>> >>> fruit.count()
>>>> 1000
>>>>
>>>> >>> fruit.take(5)
>>>> [('peach', 1), ('apricot', 2), ('apple', 3), ('haw', 1), ('persimmon',
>>>> 9)]
>>>>
>>>> >>> fruit.persist(StorageLevel.DISK_ONLY)
>>>> Traceback (most recent call last):
>>>> File "<stdin>", line 1, in <module>
>>>> NameError: name 'StorageLevel' is not defined
>>>>
>>>>
>>>> RDD.persist method seems not working for me.
>>>> How to store a RDD to disk and how can I reload it again?
>>>>
>>>>
>>>> Thank you in advance.
>>>> Kendall
>>>>
>>>>
>>>>
Re: Question on RDD storage
Posted by Mich Talebzadeh <mi...@gmail.com>.
Hi Kendal,
We had the following before
# read that saved file
content = sc.textFile(file_path)
>>> content.collect()
['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
Let us define a schema for this first list first
Schema = StructType([ StructField("ID", IntegerType(), False)])
Next read this from the saved file together with the schema. The easiest
solution would be to read the data as a dataframe (pointed out by Sean)
from the saved file in the beginning:
content = spark.read.csv(file_path,schema = Schema)
content.printSchema()
root
|-- ID: integer (nullable = true)
>>> content.show()
+---+
| ID|
+---+
| 0|
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
+---+
HTH
view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.
On Sun, 28 Nov 2021 at 21:58, Kendall Wagner <ke...@gmail.com> wrote:
> Thanks Mich
> As you show, after reading back from textFile the int becomes str. I need
> another map to translate them?
>
> Regards
> Kendall
>
> Hi,
>>
>>
>> In Pyspark you can persist storage of a Dataframe (df) to disk by using
>> the following command
>>
>>
>> df.persist(pyspark.StorageLevel.DISK_ONLY)
>>
>>
>> note pyspark.Storagelevel above
>>
>>
>> But that only stores the dataframe df to a temporary storage (work area)
>> for spark akin to using the swap area on a Linux host. The
>> temporary storage will disappear as soon as your spark session ends! Thus
>> it is not persistent.
>>
>>
>> Spark like my other tools uses persistent files (a normal file, an HDFS
>> file etc) for storage. you can also write to a database table. In that case
>> you should be able to
>>
>> read that file later.
>>
>>
>> A simple example will show
>>
>>
>> import pyspark
>>
>> from pyspark.sql import SparkSession
>>
>> spark = SparkSession.builder.appName('example').getOrCreate()
>>
>> sc = spark.sparkContext
>>
>> rdd = sc.parallelize(range(1,10))
>>
>> file_path = "file:///tmp/abcd.txt"
>>
>> >>> rdd.getNumPartitions()
>>
>> 6
>>
>> # save it as a textfile in /tmp directory on linux. Use coalesce(1) to reduce
>> the number of partitions to one and save the file, check the docs
>>
>> rdd.coalesce(1).saveAsTextFile(file_path)
>>
>> # read that saved file
>>
>> content = sc.textFile(file_path)
>>
>> >>> content.collect()
>>
>> ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
>>
>> That file file_path is persistent and will stay there in /tmp directory
>>
>>
>> HTH
>>
>>
>>
>>
>> view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sun, 28 Nov 2021 at 03:13, Kendall Wagner <ke...@gmail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> Sorry I am a spark newbie.
>>> In pyspark session, I want to store the RDD so that next time I run
>>> pyspark again, the RDD will be reloaded.
>>>
>>> I tried this:
>>>
>>> >>> fruit.count()
>>> 1000
>>>
>>> >>> fruit.take(5)
>>> [('peach', 1), ('apricot', 2), ('apple', 3), ('haw', 1), ('persimmon',
>>> 9)]
>>>
>>> >>> fruit.persist(StorageLevel.DISK_ONLY)
>>> Traceback (most recent call last):
>>> File "<stdin>", line 1, in <module>
>>> NameError: name 'StorageLevel' is not defined
>>>
>>>
>>> RDD.persist method seems not working for me.
>>> How to store a RDD to disk and how can I reload it again?
>>>
>>>
>>> Thank you in advance.
>>> Kendall
>>>
>>>
>>>
Re: Question on RDD storage
Posted by Kendall Wagner <ke...@gmail.com>.
Thanks Mich
As you show, after reading back from textFile the int becomes str. I need
another map to translate them?
Regards
Kendall
Hi,
>
>
> In Pyspark you can persist storage of a Dataframe (df) to disk by using
> the following command
>
>
> df.persist(pyspark.StorageLevel.DISK_ONLY)
>
>
> note pyspark.Storagelevel above
>
>
> But that only stores the dataframe df to a temporary storage (work area)
> for spark akin to using the swap area on a Linux host. The
> temporary storage will disappear as soon as your spark session ends! Thus
> it is not persistent.
>
>
> Spark like my other tools uses persistent files (a normal file, an HDFS
> file etc) for storage. you can also write to a database table. In that case
> you should be able to
>
> read that file later.
>
>
> A simple example will show
>
>
> import pyspark
>
> from pyspark.sql import SparkSession
>
> spark = SparkSession.builder.appName('example').getOrCreate()
>
> sc = spark.sparkContext
>
> rdd = sc.parallelize(range(1,10))
>
> file_path = "file:///tmp/abcd.txt"
>
> >>> rdd.getNumPartitions()
>
> 6
>
> # save it as a textfile in /tmp directory on linux. Use coalesce(1) to reduce
> the number of partitions to one and save the file, check the docs
>
> rdd.coalesce(1).saveAsTextFile(file_path)
>
> # read that saved file
>
> content = sc.textFile(file_path)
>
> >>> content.collect()
>
> ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
>
> That file file_path is persistent and will stay there in /tmp directory
>
>
> HTH
>
>
>
>
> view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 28 Nov 2021 at 03:13, Kendall Wagner <ke...@gmail.com>
> wrote:
>
>> Hello,
>>
>> Sorry I am a spark newbie.
>> In pyspark session, I want to store the RDD so that next time I run
>> pyspark again, the RDD will be reloaded.
>>
>> I tried this:
>>
>> >>> fruit.count()
>> 1000
>>
>> >>> fruit.take(5)
>> [('peach', 1), ('apricot', 2), ('apple', 3), ('haw', 1), ('persimmon', 9)]
>>
>> >>> fruit.persist(StorageLevel.DISK_ONLY)
>> Traceback (most recent call last):
>> File "<stdin>", line 1, in <module>
>> NameError: name 'StorageLevel' is not defined
>>
>>
>> RDD.persist method seems not working for me.
>> How to store a RDD to disk and how can I reload it again?
>>
>>
>> Thank you in advance.
>> Kendall
>>
>>
>>
Re: Question on RDD storage
Posted by Mich Talebzadeh <mi...@gmail.com>.
Hi,
In Pyspark you can persist storage of a Dataframe (df) to disk by using the
following command
df.persist(pyspark.StorageLevel.DISK_ONLY)
note pyspark.Storagelevel above
But that only stores the dataframe df to a temporary storage (work area)
for spark akin to using the swap area on a Linux host. The
temporary storage will disappear as soon as your spark session ends! Thus
it is not persistent.
Spark like my other tools uses persistent files (a normal file, an HDFS
file etc) for storage. you can also write to a database table. In that case
you should be able to
read that file later.
A simple example will show
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('example').getOrCreate()
sc = spark.sparkContext
rdd = sc.parallelize(range(1,10))
file_path = "file:///tmp/abcd.txt"
>>> rdd.getNumPartitions()
6
# save it as a textfile in /tmp directory on linux. Use coalesce(1) to reduce
the number of partitions to one and save the file, check the docs
rdd.coalesce(1).saveAsTextFile(file_path)
# read that saved file
content = sc.textFile(file_path)
>>> content.collect()
['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
That file file_path is persistent and will stay there in /tmp directory
HTH
view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.
On Sun, 28 Nov 2021 at 03:13, Kendall Wagner <ke...@gmail.com> wrote:
> Hello,
>
> Sorry I am a spark newbie.
> In pyspark session, I want to store the RDD so that next time I run
> pyspark again, the RDD will be reloaded.
>
> I tried this:
>
> >>> fruit.count()
> 1000
>
> >>> fruit.take(5)
> [('peach', 1), ('apricot', 2), ('apple', 3), ('haw', 1), ('persimmon', 9)]
>
> >>> fruit.persist(StorageLevel.DISK_ONLY)
> Traceback (most recent call last):
> File "<stdin>", line 1, in <module>
> NameError: name 'StorageLevel' is not defined
>
>
> RDD.persist method seems not working for me.
> How to store a RDD to disk and how can I reload it again?
>
>
> Thank you in advance.
> Kendall
>
>
>
Re: Question on RDD storage
Posted by Kendall Wagner <ke...@gmail.com>.
Hello
I tried saveAsTextFile but this saves the structure as text.
After reading from the text file I can't access the structure directly.
So how?
Thanks again.
On Sun, Nov 28, 2021 at 1:24 PM Sean Owen <sr...@gmail.com> wrote:
> You didn't import the class.
> persist() does not save across sessions. You need to write with methods
> like saveAsTextFile or whatever is appropriate, or .write methods on a
> DataFrame.
>
> On Sat, Nov 27, 2021 at 9:13 PM Kendall Wagner <ke...@gmail.com>
> wrote:
>
>> Hello,
>>
>> Sorry I am a spark newbie.
>> In pyspark session, I want to store the RDD so that next time I run
>> pyspark again, the RDD will be reloaded.
>>
>> I tried this:
>>
>> >>> fruit.count()
>> 1000
>>
>> >>> fruit.take(5)
>> [('peach', 1), ('apricot', 2), ('apple', 3), ('haw', 1), ('persimmon', 9)]
>>
>> >>> fruit.persist(StorageLevel.DISK_ONLY)
>> Traceback (most recent call last):
>> File "<stdin>", line 1, in <module>
>> NameError: name 'StorageLevel' is not defined
>>
>>
>> RDD.persist method seems not working for me.
>> How to store a RDD to disk and how can I reload it again?
>>
>>
>> Thank you in advance.
>> Kendall
>>
>>
>>
Re: Question on RDD storage
Posted by Sean Owen <sr...@gmail.com>.
You didn't import the class.
persist() does not save across sessions. You need to write with methods
like saveAsTextFile or whatever is appropriate, or .write methods on a
DataFrame.
On Sat, Nov 27, 2021 at 9:13 PM Kendall Wagner <ke...@gmail.com>
wrote:
> Hello,
>
> Sorry I am a spark newbie.
> In pyspark session, I want to store the RDD so that next time I run
> pyspark again, the RDD will be reloaded.
>
> I tried this:
>
> >>> fruit.count()
> 1000
>
> >>> fruit.take(5)
> [('peach', 1), ('apricot', 2), ('apple', 3), ('haw', 1), ('persimmon', 9)]
>
> >>> fruit.persist(StorageLevel.DISK_ONLY)
> Traceback (most recent call last):
> File "<stdin>", line 1, in <module>
> NameError: name 'StorageLevel' is not defined
>
>
> RDD.persist method seems not working for me.
> How to store a RDD to disk and how can I reload it again?
>
>
> Thank you in advance.
> Kendall
>
>
>