You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by 张健BJ <zh...@datagrand.com> on 2022/10/31 13:37:18 UTC

spark - local question

Dear developers:
 I have a question about the pyspark local mode. Can it be used in production and Will it cause unexpected problems? The scenario is as follows: 
Our team wants to develop an etl component based on python language. Data can be transferred between various data sources. 
If there is no yarn environment, can we read data from Database A and write it to Database B in local mode.Will this function be guaranteed to be stable and available?
Thanks,
Look forward to your reply

回复:spark - local question

Posted by 张健BJ <zh...@datagrand.com>.
Hello, The statement that memory does not increase with the increase of data is wrong, but the increase is very small. I tested 5 million pieces of data today, and I was surprised to find that it only uses 2GB of memory. I used pymysql to read 100W pieces of data, which uses more memory than this. In my opinion,the parameters of the Spark UI are exactly the same as those I configured. I don't know what strategies are used to optimize memory. I used the last code,the screenshot is as follows:
64G memory machine for test:
Read 5 million data:
Read 1 million data in pymysql
------------------------------------------------------------------
发件人:Bjørn Jørgensen <bj...@gmail.com>
发送时间:2022年11月6日(星期日) 03:01
收件人:张健BJ <zh...@datagrand.com>
抄 送:Sean Owen <sr...@gmail.com>; user <us...@spark.apache.org>
主 题:Re: spark - local question
I am using jupyter docker stack with spark. 
So I started a new notebook and this code.
import multiprocessing
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
import time
t1 = time.time()
number_cores = int(multiprocessing.cpu_count())
memory_gb = 4
def get_spark_session(app_name: str, conf: SparkConf):
 conf.setMaster("local[{}]".format(number_cores))
 conf.set("spark.driver.memory", "{}g".format(memory_gb)).set(
 "spark.sql.adaptive.enabled", "True"
 ).set(
 "spark.serializer", "org.apache.spark.serializer.KryoSerializer"
 ).set(
 "spark.sql.repl.eagerEval.maxNumRows", "100"
 ).set(
 "sc.setLogLevel", "ERROR"
 ).set(
 "spark.executor.memory", "8g")
 return SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()
spark = get_spark_session("My_app", SparkConf())
Gives my this in http://HOSTIP:4040/environment/ <http://HOSTIP:4040/environment/ >
So it works. 
lør. 5. nov. 2022 kl. 19:21 skrev 张健BJ <zhangjianbj@datagrand.com <mailto:zhangjianbj@datagrand.com >>:
ok,thank you very much :) I also have two questions: does the "spark. read. format (" jdbc ")" read all the data from the database at once, and does it require a limit. My test result is that with the increase of data, I observe that the local memory usage has not changed significantly. Why?
In addition, I tried to set "spark. driver. memory" and "spark. executor. memory" to 4g in local mode, but I observed that the memory usage did not work, and it was always about 1g. The code is as follows:
import multiprocessing
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
import time
t1 = time.time()
number_cores = int(multiprocessing.cpu_count())
memory_gb = 4
def get_spark_session(app_name: str, conf: SparkConf):
 conf.setMaster("local[{}]".format(number_cores))
 conf.set("spark.driver.memory", "{}g".format(memory_gb)).set(
"spark.sql.adaptive.enabled", "True"
).set(
"spark.serializer", "org.apache.spark.serializer.KryoSerializer"
).set(
"spark.sql.repl.eagerEval.maxNumRows", "100"
).set(
"sc.setLogLevel", "ERROR"
).set(
"spark.executor.memory", "4g")
return SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()
spark = get_spark_session("My_app", SparkConf())
df = spark.read.format("jdbc").options(
url='jdbc:mysql://127.0.0.1:63306/recommend?useSSL=false <http://127.0.0.1:63306/recommend?useSSL=false >',
driver='com.mysql.jdbc.Driver',
dbtable="item_info",
user="root",
password="root"
).load()
my_url = 'jdbc:mysql://127.0.0.1:63306/etl?useSSL=false <http://127.0.0.1:63306/etl?useSSL=false >'
auth_mysql = {'user': 'root', 'password': 'root'}
df = df.withColumnRenamed("id", "itemid").withColumnRenamed("category", "cateid") \
 .withColumnRenamed('weight', 'score').withColumnRenamed('tag', 'item_tags') \
 .withColumnRenamed('modify_time', 'item_modify_time').withColumnRenamed('start_time', 'dg_start_time') \
 .withColumnRenamed('end_time', 'dg_end_time')
df = df.select(
 ['itemid', 'cateid', 'title', 'score', 'item_tags', 'item_modify_time', 'dg_start_time', 'dg_end_time']).limit(20)
df.write.jdbc(my_url, 'item_info', mode='append', properties=auth_mysql)
print(time.time() - t1)
------------------------------------------------------------------
发件人:Bjørn Jørgensen <bjornjorgensen@gmail.com <mailto:bjornjorgensen@gmail.com >>
发送时间:2022年11月5日(星期六) 04:51
收件人:Sean Owen <srowen@gmail.com <mailto:srowen@gmail.com >>
抄 送:张健BJ <zhangjianbj@datagrand.com <mailto:zhangjianbj@datagrand.com >>; user <user@spark.apache.org <mailto:user@spark.apache.org >>
主 题:Re: spark - local question
Yes, Spark in local mode works :) 
One tip 
If you just start it, then the default settings is one core and 1 GB. 
I'm using this func to start spark in local mode to get all cors and max RAM
import multiprocessing
import os
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
number_cores = int(multiprocessing.cpu_count())
mem_bytes = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES") # e.g. 4015976448
memory_gb = int(mem_bytes / (1024.0**3)) # e.g. 3.74
def get_spark_session(app_name: str, conf: SparkConf):
 conf.setMaster("local[{}]".format(number_cores))
 conf.set("spark.driver.memory", "{}g".format(memory_gb)).set(
 "spark.sql.adaptive.enabled", "True"
 ).set(
 "spark.serializer", "org.apache.spark.serializer.KryoSerializer"
 ).set(
 "spark.sql.repl.eagerEval.maxNumRows", "100"
 ).set(
 "sc.setLogLevel", "ERROR"
 )
 return SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()
spark = get_spark_session("My_app", SparkConf())
Now when you type spark you will see something like this. 
SparkSession - in-memory
SparkContext
Spark UI
Version v3.4.0-SNAPSHOT
Master local[16]
AppName My_app
man. 31. okt. 2022 kl. 14:50 skrev Sean Owen <srowen@gmail.com <mailto:srowen@gmail.com >>:
Sure, as stable and available as your machine is. If you don't need fault tolerance or scale beyond one machine, sure.
On Mon, Oct 31, 2022 at 8:43 AM 张健BJ <zhangjianbj@datagrand.com <mailto:zhangjianbj@datagrand.com >> wrote:
Dear developers:
 I have a question about the pyspark local mode. Can it be used in production and Will it cause unexpected problems? The scenario is as follows: 
Our team wants to develop an etl component based on python language. Data can be transferred between various data sources. 
If there is no yarn environment, can we read data from Database A and write it to Database B in local mode.Will this function be guaranteed to be stable and available?
Thanks,
Look forward to your reply
-- 
Bjørn Jørgensen 
Vestre Aspehaug 4, 6010 Ålesund 
Norge
+47 480 94 297
-- 
Bjørn Jørgensen 
Vestre Aspehaug 4, 6010 Ålesund 
Norge
+47 480 94 297

Re: spark - local question

Posted by Bjørn Jørgensen <bj...@gmail.com>.
I am using jupyter docker stack with spark.
So I started a new notebook and this code.

import multiprocessing
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext

import time

t1 = time.time()
number_cores = int(multiprocessing.cpu_count())
memory_gb = 4


def get_spark_session(app_name: str, conf: SparkConf):
    conf.setMaster("local[{}]".format(number_cores))
    conf.set("spark.driver.memory", "{}g".format(memory_gb)).set(
        "spark.sql.adaptive.enabled", "True"
    ).set(
        "spark.serializer", "org.apache.spark.serializer.KryoSerializer"
    ).set(
        "spark.sql.repl.eagerEval.maxNumRows", "100"
    ).set(
        "sc.setLogLevel", "ERROR"
    ).set(
        "spark.executor.memory", "8g")

    return
SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()


spark = get_spark_session("My_app", SparkConf())

Gives my this in http://HOSTIP:4040/environment/

[image: image.png]

So it works.

lør. 5. nov. 2022 kl. 19:21 skrev 张健BJ <zh...@datagrand.com>:

> ok,thank you very much :) I also have two questions:
> does the "spark. read. format (" jdbc ")" read all the data from the database at once, and does it require a limit. My test result is that with the increase of data, I observe that the local memory usage has not changed significantly. Why?
>
> In addition, I tried to set "spark. driver. memory" and "spark. executor. memory" to 4g in local mode, but I observed that the memory usage did not work, and it was always about 1g. The
> code is as follows:
>
> import multiprocessing
> from pyspark.sql import SparkSession
> from pyspark import SparkConf, SparkContext
>
> import time
>
> t1 = time.time()
> number_cores = int(multiprocessing.cpu_count())
> memory_gb = 4
>
>
> def get_spark_session(app_name: str, conf: SparkConf):
>     conf.setMaster("local[{}]".format(number_cores))
>     conf.set("spark.driver.memory", "{}g".format(memory_gb)).set(
>         "spark.sql.adaptive.enabled", "True"
>     ).set(
>         "spark.serializer", "org.apache.spark.serializer.KryoSerializer"
>     ).set(
>         "spark.sql.repl.eagerEval.maxNumRows", "100"
>     ).set(
>         "sc.setLogLevel", "ERROR"
>     ).set(
>         "spark.executor.memory", "4g")
>
>     return SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()
>
>
> spark = get_spark_session("My_app", SparkConf())
>
> df = spark.read.format("jdbc").options(
>     url='jdbc:mysql://127.0.0.1:63306/recommend?useSSL=false',
>     driver='com.mysql.jdbc.Driver',
>     dbtable="item_info",
>     user="root",
>     password="root"
> ).load()
> my_url = 'jdbc:mysql://127.0.0.1:63306/etl?useSSL=false'
> auth_mysql = {'user': 'root', 'password': 'root'}
> df = df.withColumnRenamed("id", "itemid").withColumnRenamed("category", "cateid") \
>     .withColumnRenamed('weight', 'score').withColumnRenamed('tag', 'item_tags') \
>     .withColumnRenamed('modify_time', 'item_modify_time').withColumnRenamed('start_time', 'dg_start_time') \
>     .withColumnRenamed('end_time', 'dg_end_time')
> df = df.select(
>     ['itemid', 'cateid', 'title', 'score', 'item_tags', 'item_modify_time', 'dg_start_time', 'dg_end_time']).limit(20)
> df.write.jdbc(my_url, 'item_info', mode='append', properties=auth_mysql)
> print(time.time() - t1)
>
> ------------------------------------------------------------------
> 发件人:Bjørn Jørgensen <bj...@gmail.com>
> 发送时间:2022年11月5日(星期六) 04:51
> 收件人:Sean Owen <sr...@gmail.com>
> 抄 送:张健BJ <zh...@datagrand.com>; user <us...@spark.apache.org>
> 主 题:Re: spark - local question
>
> Yes, Spark in local mode works :)
> One tip
> If you just start it, then the default settings is one core and 1 GB.
>
> I'm using this func to start spark in local mode to get all cors and max
> RAM
>
> import multiprocessing
> import os
> from pyspark.sql import SparkSession
> from pyspark import SparkConf, SparkContext
>
>
> number_cores = int(multiprocessing.cpu_count())
>
> mem_bytes = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES")  #
> e.g. 4015976448
> memory_gb = int(mem_bytes / (1024.0**3))  # e.g. 3.74
>
>
> def get_spark_session(app_name: str, conf: SparkConf):
>     conf.setMaster("local[{}]".format(number_cores))
>     conf.set("spark.driver.memory", "{}g".format(memory_gb)).set(
>         "spark.sql.adaptive.enabled", "True"
>     ).set(
>         "spark.serializer", "org.apache.spark.serializer.KryoSerializer"
>     ).set(
>         "spark.sql.repl.eagerEval.maxNumRows", "100"
>     ).set(
>         "sc.setLogLevel", "ERROR"
>     )
>
>     return
> SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()
>
>
> spark = get_spark_session("My_app", SparkConf())
>
>
>
> Now when you type spark you will see something like this.
>
>
> SparkSession - in-memory
>
> SparkContext
>
> Spark UI
>
> Version v3.4.0-SNAPSHOT
> Master  local[16]
> AppName My_app
>
>
> man. 31. okt. 2022 kl. 14:50 skrev Sean Owen <sr...@gmail.com>:
> Sure, as stable and available as your machine is. If you don't need fault
> tolerance or scale beyond one machine, sure.
>
> On Mon, Oct 31, 2022 at 8:43 AM 张健BJ <zh...@datagrand.com> wrote:
> Dear developers:
>     I have a question about  the pyspark local
> mode. Can it be used in production and Will it cause unexpected problems?
> The scenario is as follows:
>
> Our team wants to develop an etl component based on python language. Data can be transferred between various data sources.
>
> If there is no yarn environment, can we read data from Database A and write it to Database B in local mode.Will this function be guaranteed to be stable and available?
>
>
>
> Thanks,
> Look forward to your reply
>
>
> --
> Bjørn Jørgensen
> Vestre Aspehaug 4, 6010 Ålesund
> Norge
>
> +47 480 94 297
>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297

Re: spark - local question

Posted by Bjørn Jørgensen <bj...@gmail.com>.
Yes, Spark in local mode works :)
One tip
If you just start it, then the default settings is one core and 1 GB.

I'm using this func to start spark in local mode to get all cors and max RAM

import multiprocessing
import os
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext


number_cores = int(multiprocessing.cpu_count())

mem_bytes = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES")  #
e.g. 4015976448
memory_gb = int(mem_bytes / (1024.0**3))  # e.g. 3.74


def get_spark_session(app_name: str, conf: SparkConf):
    conf.setMaster("local[{}]".format(number_cores))
    conf.set("spark.driver.memory", "{}g".format(memory_gb)).set(
        "spark.sql.adaptive.enabled", "True"
    ).set(
        "spark.serializer", "org.apache.spark.serializer.KryoSerializer"
    ).set(
        "spark.sql.repl.eagerEval.maxNumRows", "100"
    ).set(
        "sc.setLogLevel", "ERROR"
    )

    return
SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()


spark = get_spark_session("My_app", SparkConf())



Now when you type spark you will see something like this.


SparkSession - in-memory

SparkContext

Spark UI

Version v3.4.0-SNAPSHOT
Master  local[16]
AppName My_app


man. 31. okt. 2022 kl. 14:50 skrev Sean Owen <sr...@gmail.com>:

> Sure, as stable and available as your machine is. If you don't need fault
> tolerance or scale beyond one machine, sure.
>
> On Mon, Oct 31, 2022 at 8:43 AM 张健BJ <zh...@datagrand.com> wrote:
>
>> Dear developers:
>>     I have a question about  the pyspark local
>> mode. Can it be used in production and Will it cause unexpected problems?
>> The scenario is as follows:
>>
>> Our team wants to develop an etl component based on python language. Data can be transferred between various data sources.
>>
>> If there is no yarn environment, can we read data from Database A and write it to Database B in local mode.Will this function be guaranteed to be stable and available?
>>
>>
>>
>> Thanks,
>> Look forward to your reply
>>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297

Re: spark - local question

Posted by Sean Owen <sr...@gmail.com>.
Sure, as stable and available as your machine is. If you don't need fault
tolerance or scale beyond one machine, sure.

On Mon, Oct 31, 2022 at 8:43 AM 张健BJ <zh...@datagrand.com> wrote:

> Dear developers:
>     I have a question about  the pyspark local
> mode. Can it be used in production and Will it cause unexpected problems?
> The scenario is as follows:
>
> Our team wants to develop an etl component based on python language. Data can be transferred between various data sources.
>
> If there is no yarn environment, can we read data from Database A and write it to Database B in local mode.Will this function be guaranteed to be stable and available?
>
>
>
> Thanks,
> Look forward to your reply
>