You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by 宋源栋 <yu...@greatopensource.com> on 2018/04/11 09:10:05 UTC

Spark is only using one worker machine when more are available


Hi all,
I hava a standalone mode spark cluster without HDFS with 10 machines that each one has 40 cpu cores and 128G RAM.
My application is a sparksql application that reads data from database "tpch_100g" in mysql and run tpch queries. When loading tables from myql to spark, I spilts the biggest table "lineitem" into 600 partitions. 

When my application runs, there are only 40 executor(spark.executor.memory = 1g, spark.executor.cores = 1) in executor page of spark application web and all executors are on the same mathine. It is too slowly that all tasks are parallelly running in only one mathine.




Re: Spark Kubernetes Volumes

Posted by Anirudh Ramanathan <ra...@google.com.INVALID>.
There's a JIRA SPARK-23529
<https://issues.apache.org/jira/browse/SPARK-23529> that deals with
mounting hostpath volumes.
I propose we extend that PR/JIRA to encompass all the different volume
types and allow mounting them into the driver/executors.

On Thu, Apr 12, 2018 at 10:55 AM Yinan Li <li...@gmail.com> wrote:

> Hi Marius,
>
> Spark on Kubernetes does not yet support mounting user-specified volumes
> natively. But mounting volume is supported in
> https://github.com/GoogleCloudPlatform/spark-on-k8s-operator. Please see
> https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/docs/user-guide.md#mounting-volumes
> .
>
> On Thu, Apr 12, 2018 at 7:50 AM, Marius <m....@gmail.com> wrote:
>
>> Hey,
>>
>> i have a question regarding the Spark on Kubernetes feature. I would like
>> to mount a pre-populated Kubernetes volume into the execution pods of
>> Spark. One of my tools that i invoke using the Sparks pipe command requires
>> these files to be available on a POSIX compatible FS and they are too large
>> to justify copying them around using addFile. If this is not possible i
>> would like to know if the community be interested in such a feature.
>>
>> Cheers
>>
>> Marius
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>
>>
>

-- 
Anirudh Ramanathan

Re: Spark Kubernetes Volumes

Posted by Yinan Li <li...@gmail.com>.
Hi Marius,

Spark on Kubernetes does not yet support mounting user-specified volumes
natively. But mounting volume is supported in
https://github.com/GoogleCloudPlatform/spark-on-k8s-operator. Please see
https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/docs/user-guide.md#mounting-volumes
.

On Thu, Apr 12, 2018 at 7:50 AM, Marius <m....@gmail.com> wrote:

> Hey,
>
> i have a question regarding the Spark on Kubernetes feature. I would like
> to mount a pre-populated Kubernetes volume into the execution pods of
> Spark. One of my tools that i invoke using the Sparks pipe command requires
> these files to be available on a POSIX compatible FS and they are too large
> to justify copying them around using addFile. If this is not possible i
> would like to know if the community be interested in such a feature.
>
> Cheers
>
> Marius
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>
>

Spark Kubernetes Volumes

Posted by Marius <m....@gmail.com>.
Hey,

i have a question regarding the Spark on Kubernetes feature. I would 
like to mount a pre-populated Kubernetes volume into the execution pods 
of Spark. One of my tools that i invoke using the Sparks pipe command 
requires these files to be available on a POSIX compatible FS and they 
are too large to justify copying them around using addFile. If this is 
not possible i would like to know if the community be interested in such 
a feature.

Cheers

Marius

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Spark is only using one worker machine when more are available

Posted by Gourav Sengupta <go...@gmail.com>.
Hi,

Just for sake of clarity can you please given the full statement for
reading the data from the largest table? I mean not the programmatic one
but the one which has the full statement in it.


Regards,
Gourav Sengupta




On Thu, Apr 12, 2018 at 7:19 AM, Jhon Anderson Cardenas Diaz <
jhonderson2007@gmail.com> wrote:

> Hi.
>
> On spark standalone i think you can not specify the number of workers
> machines to use but you can achieve that in this way:
> https://stackoverflow.com/questions/39399205/spark-
> standalone-number-executors-cores-control.
>
> For example, if you want that your jobs run on the 10 machines using all
> their cores (10 executors, each one in a different machine and with 40
> cores), you can use this configuration:
>
> spark.cores.max        = 400
> spark.executor.cores  = 40
>
> If you want more executors with less cores each one (lets say 20
> executors, each one with 20 cores):
>
> spark.cores.max        = 400
> spark.executor.cores  = 20
>
> Note that in the last case each worker machine will run two executors.
>
> In summary, use this trick:
>
> number-of-executors = spark.cores.max / spark.executor.cores.
>
> And have in mind that the executors will be divided among the available
> workers.
>
> Regards.
>
>
> 2018-04-11 21:39 GMT-05:00 宋源栋 <yu...@greatopensource.com>:
>
>> Hi
>>  1. Spark version : 2.3.0
>>  2. jdk: oracle jdk 1.8
>>  3. os version: centos 6.8
>>  4. spark-env.sh: null
>>  5. spark session config:
>>
>>
>> SparkSession.builder().appName("DBScale")
>>                 .config("spark.sql.crossJoin.enabled", "true")
>>                 .config("spark.sql.adaptive.enabled", "true")
>>                 .config("spark.scheduler.mode", "FAIR")
>>                 .config("spark.executor.memory", "1g")
>>                 .config("spark.executor.cores", 1)
>>                 .config("spark.driver.memory", "20")
>>                 .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>>                 .config("spark.executor.extraJavaOptions",
>>                         "-XX:+UseG1GC -XX:+PrintFlagsFinal -XX:+PrintReferenceGC " +
>>                                 "-verbose:gc -XX:+PrintGCDetails " +
>>                                 "-XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy")
>>                 .master(this.spark_master)
>>                 .getOrCreate();
>>
>>   6. core code:
>>
>>
>>          for (SparksqlTableInfo tableInfo: this.dbtable){ // this loop reads data from mysql
>>             String dt = "(" + tableInfo.sql + ")" + tableInfo.tmp_table_name;
>>             String[] pred = new String[tableInfo.partition_num];
>>             if (tableInfo.partition_num > 0) {
>>                 for (int j = 0; j < tableInfo.partition_num; j++) {
>>                     String str = "some where clause to split mysql table into many partitions";
>>                     pred[j] = str;
>>                 }
>>                 Dataset<Row> jdbcDF = ss.read().jdbc(this.url, dt, pred, connProp); //this.url is mysql-jdbc-url (mysql://XX.XX.XX.XX:XXXX)
>>                 jdbcDF.createOrReplaceTempView(tableInfo.tmp_table_name);
>>             } else {
>>                 logger.warn("[\033[32m" + "partition_num == 0" + "\033[0m]");
>>                 Dataset<Row> jdbcDF = ss.read().jdbc(this.url, dt, connProp);
>>                 jdbcDF.createOrReplaceTempView(tableInfo.tmp_table_name);
>>             }
>>         }
>>
>>
>>         // Then run a query and write the result set to mysql
>>
>>         Dataset<Row> result = ss.sql(this.sql);
>>         result.explain(true);
>>         connProp.put("rewriteBatchedStatements", "true");
>>         connProp.put("sessionVariables", "sql_log_bin=off");
>>         result.write().jdbc(this.dst_url, this.dst_table, connProp);
>>
>>
>>
>> ------------------------------------------------------------------
>> 发件人:Jhon Anderson Cardenas Diaz <jh...@gmail.com>
>> 发送时间:2018年4月11日(星期三) 22:42
>> 收件人:宋源栋 <yu...@greatopensource.com>
>> 抄 送:user <us...@spark.apache.org>
>> 主 题:Re: Spark is only using one worker machine when more are available
>>
>> Hi, could you please share the environment variables values that you are
>> sending when you run the jobs, spark version, etc.. more details.
>> Btw, you should take a look on SPARK_WORKER_INSTANCES and
>> SPARK_WORKER_CORES if you are using spark 2.0.0
>> <https://spark.apache.org/docs/preview/spark-standalone.html>.
>>
>> Regards.
>>
>> 2018-04-11 4:10 GMT-05:00 宋源栋 <yu...@greatopensource.com>:
>>
>>
>> Hi all,
>>
>> I hava a standalone mode spark cluster without HDFS with 10 machines that
>> each one has 40 cpu cores and 128G RAM.
>>
>> My application is a sparksql application that reads data from database
>> "tpch_100g" in mysql and run tpch queries. When loading tables from myql to
>> spark, I spilts the biggest table "lineitem" into 600 partitions.
>>
>> When my application runs, there are only 40
>> executor(spark.executor.memory = 1g, spark.executor.cores = 1) in executor
>> page of spark application web and all executors are on the same mathine. It
>> is too slowly that all tasks are parallelly running in only one mathine.
>>
>>
>>
>>
>>
>>
>

Re: Spark is only using one worker machine when more are available

Posted by Jhon Anderson Cardenas Diaz <jh...@gmail.com>.
Hi.

On spark standalone i think you can not specify the number of workers
machines to use but you can achieve that in this way:
https://stackoverflow.com/questions/39399205/spark-standalone-number-executors-cores-control
.

For example, if you want that your jobs run on the 10 machines using all
their cores (10 executors, each one in a different machine and with 40
cores), you can use this configuration:

spark.cores.max        = 400
spark.executor.cores  = 40

If you want more executors with less cores each one (lets say 20 executors,
each one with 20 cores):

spark.cores.max        = 400
spark.executor.cores  = 20

Note that in the last case each worker machine will run two executors.

In summary, use this trick:

number-of-executors = spark.cores.max / spark.executor.cores.

And have in mind that the executors will be divided among the available
workers.

Regards.


2018-04-11 21:39 GMT-05:00 宋源栋 <yu...@greatopensource.com>:

> Hi
>  1. Spark version : 2.3.0
>  2. jdk: oracle jdk 1.8
>  3. os version: centos 6.8
>  4. spark-env.sh: null
>  5. spark session config:
>
>
> SparkSession.builder().appName("DBScale")
>                 .config("spark.sql.crossJoin.enabled", "true")
>                 .config("spark.sql.adaptive.enabled", "true")
>                 .config("spark.scheduler.mode", "FAIR")
>                 .config("spark.executor.memory", "1g")
>                 .config("spark.executor.cores", 1)
>                 .config("spark.driver.memory", "20")
>                 .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>                 .config("spark.executor.extraJavaOptions",
>                         "-XX:+UseG1GC -XX:+PrintFlagsFinal -XX:+PrintReferenceGC " +
>                                 "-verbose:gc -XX:+PrintGCDetails " +
>                                 "-XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy")
>                 .master(this.spark_master)
>                 .getOrCreate();
>
>   6. core code:
>
>
>          for (SparksqlTableInfo tableInfo: this.dbtable){ // this loop reads data from mysql
>             String dt = "(" + tableInfo.sql + ")" + tableInfo.tmp_table_name;
>             String[] pred = new String[tableInfo.partition_num];
>             if (tableInfo.partition_num > 0) {
>                 for (int j = 0; j < tableInfo.partition_num; j++) {
>                     String str = "some where clause to split mysql table into many partitions";
>                     pred[j] = str;
>                 }
>                 Dataset<Row> jdbcDF = ss.read().jdbc(this.url, dt, pred, connProp); //this.url is mysql-jdbc-url (mysql://XX.XX.XX.XX:XXXX)
>                 jdbcDF.createOrReplaceTempView(tableInfo.tmp_table_name);
>             } else {
>                 logger.warn("[\033[32m" + "partition_num == 0" + "\033[0m]");
>                 Dataset<Row> jdbcDF = ss.read().jdbc(this.url, dt, connProp);
>                 jdbcDF.createOrReplaceTempView(tableInfo.tmp_table_name);
>             }
>         }
>
>
>         // Then run a query and write the result set to mysql
>
>         Dataset<Row> result = ss.sql(this.sql);
>         result.explain(true);
>         connProp.put("rewriteBatchedStatements", "true");
>         connProp.put("sessionVariables", "sql_log_bin=off");
>         result.write().jdbc(this.dst_url, this.dst_table, connProp);
>
>
>
> ------------------------------------------------------------------
> 发件人:Jhon Anderson Cardenas Diaz <jh...@gmail.com>
> 发送时间:2018年4月11日(星期三) 22:42
> 收件人:宋源栋 <yu...@greatopensource.com>
> 抄 送:user <us...@spark.apache.org>
> 主 题:Re: Spark is only using one worker machine when more are available
>
> Hi, could you please share the environment variables values that you are
> sending when you run the jobs, spark version, etc.. more details.
> Btw, you should take a look on SPARK_WORKER_INSTANCES and
> SPARK_WORKER_CORES if you are using spark 2.0.0
> <https://spark.apache.org/docs/preview/spark-standalone.html>.
>
> Regards.
>
> 2018-04-11 4:10 GMT-05:00 宋源栋 <yu...@greatopensource.com>:
>
>
> Hi all,
>
> I hava a standalone mode spark cluster without HDFS with 10 machines that
> each one has 40 cpu cores and 128G RAM.
>
> My application is a sparksql application that reads data from database
> "tpch_100g" in mysql and run tpch queries. When loading tables from myql to
> spark, I spilts the biggest table "lineitem" into 600 partitions.
>
> When my application runs, there are only 40 executor(spark.executor.memory
> = 1g, spark.executor.cores = 1) in executor page of spark application web
> and all executors are on the same mathine. It is too slowly that all tasks
> are parallelly running in only one mathine.
>
>
>
>
>
>

回复:Spark is only using one worker machine when more are available

Posted by 宋源栋 <yu...@greatopensource.com>.
Hi 1. Spark version : 2.3.0 2. jdk: oracle jdk 1.8 3. os version: centos 6.8 4. spark-env.sh: null 5. spark session config:    SparkSession.builder().appName("DBScale")
                .config("spark.sql.crossJoin.enabled", "true")
                .config("spark.sql.adaptive.enabled", "true")
                .config("spark.scheduler.mode", "FAIR")
                .config("spark.executor.memory", "1g")
                .config("spark.executor.cores", 1)
                .config("spark.driver.memory", "20")
                .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                .config("spark.executor.extraJavaOptions",
                        "-XX:+UseG1GC -XX:+PrintFlagsFinal -XX:+PrintReferenceGC " +
                                "-verbose:gc -XX:+PrintGCDetails " +
                                "-XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy")
                .master(this.spark_master)
                .getOrCreate();  6. core code:             for (SparksqlTableInfo tableInfo: this.dbtable){ // this loop reads data from mysql
            String dt = "(" + tableInfo.sql + ")" + tableInfo.tmp_table_name;
            String[] pred = new String[tableInfo.partition_num];
            if (tableInfo.partition_num > 0) {
                for (int j = 0; j < tableInfo.partition_num; j++) {
                    String str = "some where clause to split mysql table into many partitions";
                    pred[j] = str;
                }
                Dataset<Row> jdbcDF = ss.read().jdbc(this.url, dt, pred, connProp); //this.url is mysql-jdbc-url (mysql://XX.XX.XX.XX:XXXX)
                jdbcDF.createOrReplaceTempView(tableInfo.tmp_table_name);
            } else {
                logger.warn("[\033[32m" + "partition_num == 0" + "\033[0m]");
                Dataset<Row> jdbcDF = ss.read().jdbc(this.url, dt, connProp);
                jdbcDF.createOrReplaceTempView(tableInfo.tmp_table_name);
            }
        }
        // Then run a query and write the result set to mysql

        Dataset<Row> result = ss.sql(this.sql);
        result.explain(true);
        connProp.put("rewriteBatchedStatements", "true");
        connProp.put("sessionVariables", "sql_log_bin=off");
        result.write().jdbc(this.dst_url, this.dst_table, connProp);

------------------------------------------------------------------发件人:Jhon Anderson Cardenas Diaz <jh...@gmail.com>发送时间:2018年4月11日(星期三) 22:42收件人:宋源栋 <yu...@greatopensource.com>抄 送:user <us...@spark.apache.org>主 题:Re: Spark is only using one worker machine when more are available
Hi, could you please share the environment variables values that you are sending when you run the jobs, spark version, etc.. more details.
Btw, you should take a look on SPARK_WORKER_INSTANCES and SPARK_WORKER_CORES if you are using spark 2.0.0.

Regards.

2018-04-11 4:10 GMT-05:00 宋源栋 <yu...@greatopensource.com>:


Hi all,
I hava a standalone mode spark cluster without HDFS with 10 machines that each one has 40 cpu cores and 128G RAM.
My application is a sparksql application that reads data from database "tpch_100g" in mysql and run tpch queries. When loading tables from myql to spark, I spilts the biggest table "lineitem" into 600 partitions. 

When my application runs, there are only 40 executor(spark.executor.memory = 1g, spark.executor.cores = 1) in executor page of spark application web and all executors are on the same mathine. It is too slowly that all tasks are parallelly running in only one mathine.






Re: Spark is only using one worker machine when more are available

Posted by Jhon Anderson Cardenas Diaz <jh...@gmail.com>.
Hi, could you please share the environment variables values that you are
sending when you run the jobs, spark version, etc.. more details.
Btw, you should take a look on SPARK_WORKER_INSTANCES and SPARK_WORKER_CORES
if you are using spark 2.0.0
<https://spark.apache.org/docs/preview/spark-standalone.html>.

Regards.

2018-04-11 4:10 GMT-05:00 宋源栋 <yu...@greatopensource.com>:

>
>
> Hi all,
>
> I hava a standalone mode spark cluster without HDFS with 10 machines that
> each one has 40 cpu cores and 128G RAM.
>
> My application is a sparksql application that reads data from database
> "tpch_100g" in mysql and run tpch queries. When loading tables from myql to
> spark, I spilts the biggest table "lineitem" into 600 partitions.
>
> When my application runs, there are only 40 executor(spark.executor.memory
> = 1g, spark.executor.cores = 1) in executor page of spark application web
> and all executors are on the same mathine. It is too slowly that all tasks
> are parallelly running in only one mathine.
>
>
>
>