You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Andres M Jimenez T <ad...@hotmail.com> on 2016/06/02 16:29:15 UTC

how to increase threads per executor

Hi,


I am working with Spark 1.6.1, using kafka direct connect for streaming data.

Using spark scheduler and 3 slaves.

Kafka topic is partitioned with a value of 10.


The problem i have is, there is only one thread per executor running my function (logic implementation).


Can anybody tell me how can i increase threads per executor to get better use of CPUs?


Thanks


Here is the code i have implemented:


Driver:


JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(10000));

//prepare streaming from kafka

Set<String> topicsSet = new HashSet<>(Arrays.asList("stage1-in,stage1-retry".split(",")));

Map<String, String> kafkaParams = new HashMap<>();

kafkaParams.put("metadata.broker.list", kafkaBrokers);

kafkaParams.put("group.id", SparkStreamingImpl.class.getName());


JavaPairInputDStream<String, String> inputMessages = KafkaUtils.createDirectStream(

ssc,

String.class,

String.class,

StringDecoder.class,

StringDecoder.class,

kafkaParams,

topicsSet

);


inputMessages.foreachRDD(new ForeachRDDFunction());


ForeachFunction:


class ForeachFunction implements VoidFunction<Tuple2<String, String>> {

private static final Counter foreachConcurrent = ProcessingMetrics.metrics.counter( "foreach-concurrency" );

public ForeachFunction() {

LOG.info("Creating a new ForeachFunction");

}


public void call(Tuple2<String, String> t) throws Exception {

foreachConcurrent.inc();

LOG.info("processing message [" + t._1() + "]");

try {

Thread.sleep(1000);

} catch (Exception e) { }

foreachConcurrent.dec();

}

}


ForeachRDDFunction:


class ForeachRDDFunction implements VoidFunction<JavaPairRDD<String, String>> {

private static final Counter foreachRDDConcurrent = ProcessingMetrics.metrics.counter( "foreachRDD-concurrency" );

private ForeachFunction foreachFunction = new ForeachFunction();

public ForeachRDDFunction() {

LOG.info("Creating a new ForeachRDDFunction");

}


public void call(JavaPairRDD<String, String> t) throws Exception {

foreachRDDConcurrent.inc();

LOG.info("call from inputMessages.foreachRDD with [" + t.partitions().size() + "] partitions");

for (Partition p : t.partitions()) {

if (p instanceof KafkaRDDPartition){

LOG.info("partition [" + p.index() + "] with count [" + ((KafkaRDDPartition) p).count() + "]");

}

}

t.foreachAsync(foreachFunction);

foreachRDDConcurrent.dec();

}

}


The log from driver that tells me my RDD is partitioned to process in parallel:


[Stage 70:>  (3 + 3) / 20][Stage 71:>  (0 + 0) / 20][Stage 72:>  (0 + 0) / 20]16/06/02 08:32:10 INFO SparkStreamingImpl: call from inputMessages.foreachRDD with [20] partitions

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [0] with count [24]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [1] with count [0]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [2] with count [0]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [3] with count [19]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [4] with count [19]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [5] with count [20]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [6] with count [0]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [7] with count [23]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [8] with count [21]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [9] with count [0]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [10] with count [0]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [11] with count [0]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [12] with count [0]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [13] with count [26]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [14] with count [0]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [15] with count [27]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [16] with count [0]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [17] with count [16]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [18] with count [15]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [19] with count [0]


The log from one of executors showing exactly one message per second was processed (only by one thread):


16/06/02 08:32:46 INFO SparkStreamingImpl: processing message [f2b22bb9-3bd8-4e5b-b9fb-afa7e8c4deb8]

16/06/02 08:32:47 INFO SparkStreamingImpl: processing message [e267cde2-ffea-4f7a-9934-f32a3b7218cc]

16/06/02 08:32:48 INFO SparkStreamingImpl: processing message [f055fe3c-0f72-4f41-9a31-df544f1e1cd3]

16/06/02 08:32:49 INFO SparkStreamingImpl: processing message [854faaa5-0abe-49a2-b13a-c290a3720b0e]

16/06/02 08:32:50 INFO SparkStreamingImpl: processing message [1bc0a141-b910-45fe-9881-e2066928fbc6]

16/06/02 08:32:51 INFO SparkStreamingImpl: processing message [67fb99c6-1ca1-4dfb-bffe-43b927fdec07]

16/06/02 08:32:52 INFO SparkStreamingImpl: processing message [de7d5934-bab2-4019-917e-c339d864ba18]

16/06/02 08:32:53 INFO SparkStreamingImpl: processing message [e63d7a7e-de32-4527-b8f1-641cfcc8869c]

16/06/02 08:32:54 INFO SparkStreamingImpl: processing message [1ce931ee-b8b1-4645-8a51-2c697bf1513b]

16/06/02 08:32:55 INFO SparkStreamingImpl: processing message [5367f3c1-d66c-4647-bb44-f5eab719031d]


Re: how to increase threads per executor

Posted by Mich Talebzadeh <mi...@gmail.com>.
The general way passing parameters to spark-submit are as follows (note
that I use a generic shell script to submit jobs). Replace ${JAR_FILE} with
appropriate values. In general you can pass all these driver-memory,
executor-memory to shell script as variables if you wish without hard
coding them

${SPARK_HOME}/bin/spark-submit \
                --packages com.databricks:spark-csv_2.11:1.3.0 \
                --master spark://50.140.197.217:7077 \
                --driver-memory 4G \
                --num-executors 5 \
                --executor-memory 4G \
                --executor-cores 2 \
                --conf "spark.scheduler.mode=FAIR" \
                --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails
-XX:+PrintGCTimeStamps" \
                --jars
/home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar \
                --class ${FILE_NAME} \
                --conf "spark.ui.port=55556" \
                --conf "spark.driver.port=54631" \
                --conf "spark.fileserver.port=54731" \
                --conf "spark.blockManager.port=54832" \
                --conf "spark.kryoserializer.buffer.max=512" \
                ${JAR_FILE} \
                >> ${LOG_FILE}

For twitter stuff pass twitter specific parameters after the ${JAR_FILE}

${SPARK_HOME}/bin/spark-submit \
                --packages com.databricks:spark-csv_2.11:1.3.0 \
                --driver-memory 2G \
                --num-executors 1 \
                --executor-memory 2G \
                --master local[2] \
                --executor-cores 2 \
                --conf "spark.scheduler.mode=FAIR" \
                --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails
-XX:+PrintGCTimeStamps" \
                --jars
/home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar \
                --class
"com.databricks.apps.twitter_classifier.${FILE_NAME}" \
                --conf "spark.ui.port=55555" \
                --conf "spark.driver.port=54631" \
                --conf "spark.fileserver.port=54731" \
                --conf "spark.blockManager.port=54832" \
                --conf "spark.kryoserializer.buffer.max=512" \
                ${JAR_FILE} \
                ${OUTPUT_DIRECTORY:-/tmp/tweets} \
                ${NUM_TWEETS_TO_COLLECT:-10000} \
                ${OUTPUT_FILE_INTERVAL_IN_SECS:-10} \
                ${OUTPUT_FILE_PARTITIONS_EACH_INTERVAL:-1} \
                >> ${LOG_FILE}

Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 3 June 2016 at 09:10, Jacek Laskowski <ja...@japila.pl> wrote:

> --executor-cores 1 to be exact.
>
>
> Pozdrawiam,
> Jacek Laskowski
> ----
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
> On Fri, Jun 3, 2016 at 12:28 AM, Mich Talebzadeh <
> mich.talebzadeh@gmail.com> wrote:
>
>> interesting. a vm with one core!
>>
>> one simple test
>>
>> can you try running with
>>
>> --executor-cores=1
>>
>> and see it works ok please
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 2 June 2016 at 23:15, Andres M Jimenez T <ad...@hotmail.com> wrote:
>>
>>> Mich, thanks for your time,
>>>
>>>
>>> i am launching spark-submit as follows:
>>>
>>>
>>> bin/spark-submit --class com.example.SparkStreamingImpl --master
>>> spark://dev1.dev:7077 --verbose --driver-memory 1g --executor-memory 1g
>>> --conf "spark.driver.extraJavaOptions=-Dcom.sun.management.jmxremote
>>> -Dcom.sun.management.jmxremote.port=8090
>>> -Dcom.sun.management.jmxremote.rmi.port=8091
>>> -Dcom.sun.management.jmxremote.authenticate=false
>>> -Dcom.sun.management.jmxremote.ssl=false" --conf
>>> "spark.executor.extraJavaOptions=-Dcom.sun.management.jmxremote
>>> -Dcom.sun.management.jmxremote.port=8092
>>> -Dcom.sun.management.jmxremote.rmi.port=8093
>>> -Dcom.sun.management.jmxremote.authenticate=false
>>> -Dcom.sun.management.jmxremote.ssl=false" --conf
>>> "spark.scheduler.mode=FAIR" --conf /home/Processing.jar
>>>
>>>
>>> When i use --executor-cores=12 i get "Initial job has not accepted any
>>> resources; check your cluster UI to ensure that workers are registered and
>>> have sufficient resources".
>>>
>>>
>>> This, because my nodes are single core, but i want to use more than one
>>> thread per core, is this possible?
>>>
>>>
>>> root@dev1:/home/spark-1.6.1-bin-hadoop2.6# lscpu
>>> Architecture:          x86_64
>>> CPU op-mode(s):        32-bit, 64-bit
>>> Byte Order:            Little Endian
>>> CPU(s):                1
>>> On-line CPU(s) list:   0
>>> Thread(s) per core:    1
>>> Core(s) per socket:    1
>>> Socket(s):             1
>>> NUMA node(s):          1
>>> Vendor ID:             GenuineIntel
>>> CPU family:            6
>>> Model:                 58
>>> Model name:            Intel(R) Xeon(R) CPU E5-2690 v2 @ 3.00GHz
>>> Stepping:              0
>>> CPU MHz:               2999.999
>>> BogoMIPS:              5999.99
>>> Hypervisor vendor:     VMware
>>> Virtualization type:   full
>>> L1d cache:             32K
>>> L1i cache:             32K
>>> L2 cache:              256K
>>> L3 cache:              25600K
>>> NUMA node0 CPU(s):     0
>>>
>>>
>>> Thanks
>>>
>>>
>>>
>>> ------------------------------
>>> *From:* Mich Talebzadeh <mi...@gmail.com>
>>> *Sent:* Thursday, June 2, 2016 5:00 PM
>>> *To:* Andres M Jimenez T
>>> *Cc:* user@spark.apache.org
>>> *Subject:* Re: how to increase threads per executor
>>>
>>> What are passing as parameters to Spark-submit?
>>>
>>>
>>> ${SPARK_HOME}/bin/spark-submit \
>>>                 --executor-cores=12 \
>>>
>>> Also check
>>>
>>> http://spark.apache.org/docs/latest/configuration.html
>>> Configuration - Spark 1.6.1 Documentation
>>> <http://spark.apache.org/docs/latest/configuration.html>
>>> spark.apache.org
>>> Spark Configuration. Spark Properties. Dynamically Loading Spark
>>> Properties; Viewing Spark Properties; Available Properties. Application
>>> Properties; Runtime Environment
>>>
>>>
>>> Execution Behavior/spark.executor.cores
>>>
>>>
>>> HTH
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 2 June 2016 at 17:29, Andres M Jimenez T <ad...@hotmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>>
>>>> I am working with Spark 1.6.1, using kafka direct connect for streaming
>>>> data.
>>>>
>>>> Using spark scheduler and 3 slaves.
>>>>
>>>> Kafka topic is partitioned with a value of 10.
>>>>
>>>>
>>>> The problem i have is, there is only one thread per executor running my
>>>> function (logic implementation).
>>>>
>>>>
>>>> Can anybody tell me how can i increase threads per executor to get
>>>> better use of CPUs?
>>>>
>>>>
>>>> Thanks
>>>>
>>>>
>>>> Here is the code i have implemented:
>>>>
>>>>
>>>> *Driver*:
>>>>
>>>>
>>>> JavaStreamingContext ssc = new JavaStreamingContext(conf, new
>>>> Duration(10000));
>>>>
>>>> //prepare streaming from kafka
>>>>
>>>> Set<String> topicsSet = new
>>>> HashSet<>(Arrays.asList("stage1-in,stage1-retry".split(",")));
>>>>
>>>> Map<String, String> kafkaParams = new HashMap<>();
>>>>
>>>> kafkaParams.put("metadata.broker.list", kafkaBrokers);
>>>>
>>>> kafkaParams.put("group.id", SparkStreamingImpl.class.getName());
>>>>
>>>>
>>>> JavaPairInputDStream<String, String> inputMessages =
>>>> KafkaUtils.createDirectStream(
>>>>
>>>> ssc,
>>>>
>>>> String.class,
>>>>
>>>> String.class,
>>>>
>>>> StringDecoder.class,
>>>>
>>>> StringDecoder.class,
>>>>
>>>> kafkaParams,
>>>>
>>>> topicsSet
>>>>
>>>> );
>>>>
>>>>
>>>> inputMessages.foreachRDD(new ForeachRDDFunction());
>>>>
>>>>
>>>> *ForeachFunction*:
>>>>
>>>>
>>>> class ForeachFunction implements VoidFunction<Tuple2<String, String>> {
>>>>
>>>> private static final Counter foreachConcurrent =
>>>> ProcessingMetrics.metrics.counter( "foreach-concurrency" );
>>>>
>>>> public ForeachFunction() {
>>>>
>>>> LOG.info("Creating a new ForeachFunction");
>>>>
>>>> }
>>>>
>>>>
>>>> public void call(Tuple2<String, String> t) throws Exception {
>>>>
>>>> foreachConcurrent.inc();
>>>>
>>>> LOG.info("processing message [" + t._1() + "]");
>>>>
>>>> try {
>>>>
>>>> Thread.sleep(1000);
>>>>
>>>> } catch (Exception e) { }
>>>>
>>>> foreachConcurrent.dec();
>>>>
>>>> }
>>>>
>>>> }
>>>>
>>>>
>>>> *ForeachRDDFunction*:
>>>>
>>>>
>>>> class ForeachRDDFunction implements VoidFunction<JavaPairRDD<String,
>>>> String>> {
>>>>
>>>> private static final Counter foreachRDDConcurrent =
>>>> ProcessingMetrics.metrics.counter( "foreachRDD-concurrency" );
>>>>
>>>> private ForeachFunction foreachFunction = new ForeachFunction();
>>>>
>>>> public ForeachRDDFunction() {
>>>>
>>>> LOG.info("Creating a new ForeachRDDFunction");
>>>>
>>>> }
>>>>
>>>>
>>>> public void call(JavaPairRDD<String, String> t) throws Exception {
>>>>
>>>> foreachRDDConcurrent.inc();
>>>>
>>>> LOG.info("call from inputMessages.foreachRDD with [" +
>>>> t.partitions().size() + "] partitions");
>>>>
>>>> for (Partition p : t.partitions()) {
>>>>
>>>> if (p instanceof KafkaRDDPartition){
>>>>
>>>> LOG.info("partition [" + p.index() + "] with count [" +
>>>> ((KafkaRDDPartition) p).count() + "]");
>>>>
>>>> }
>>>>
>>>> }
>>>>
>>>> t.foreachAsync(foreachFunction);
>>>>
>>>> foreachRDDConcurrent.dec();
>>>>
>>>> }
>>>>
>>>> }
>>>>
>>>>
>>>> *The log from driver that tells me my RDD is partitioned to process in
>>>> parallel*:
>>>>
>>>>
>>>> [Stage 70:>  (3 + 3) / 20][Stage 71:>  (0 + 0) / 20][Stage 72:>  (0 +
>>>> 0) / 20]16/06/02 08:32:10 INFO SparkStreamingImpl: call from
>>>> inputMessages.foreachRDD with [20] partitions
>>>>
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [0] with count [24]
>>>>
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [1] with count [0]
>>>>
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [2] with count [0]
>>>>
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [3] with count [19]
>>>>
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [4] with count [19]
>>>>
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [5] with count [20]
>>>>
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [6] with count [0]
>>>>
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [7] with count [23]
>>>>
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [8] with count [21]
>>>>
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [9] with count [0]
>>>>
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [10] with count [0]
>>>>
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [11] with count [0]
>>>>
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [12] with count [0]
>>>>
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [13] with count
>>>> [26]
>>>>
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [14] with count [0]
>>>>
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [15] with count
>>>> [27]
>>>>
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [16] with count [0]
>>>>
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [17] with count
>>>> [16]
>>>>
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [18] with count
>>>> [15]
>>>>
>>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [19] with count [0]
>>>>
>>>>
>>>> *The log from one of executors showing exactly one message per second
>>>> was processed (only by one thread)*:
>>>>
>>>>
>>>> 16/06/02 08:32:46 INFO SparkStreamingImpl: processing message
>>>> [f2b22bb9-3bd8-4e5b-b9fb-afa7e8c4deb8]
>>>>
>>>> 16/06/02 08:32:47 INFO SparkStreamingImpl: processing message
>>>> [e267cde2-ffea-4f7a-9934-f32a3b7218cc]
>>>>
>>>> 16/06/02 08:32:48 INFO SparkStreamingImpl: processing message
>>>> [f055fe3c-0f72-4f41-9a31-df544f1e1cd3]
>>>>
>>>> 16/06/02 08:32:49 INFO SparkStreamingImpl: processing message
>>>> [854faaa5-0abe-49a2-b13a-c290a3720b0e]
>>>>
>>>> 16/06/02 08:32:50 INFO SparkStreamingImpl: processing message
>>>> [1bc0a141-b910-45fe-9881-e2066928fbc6]
>>>>
>>>> 16/06/02 08:32:51 INFO SparkStreamingImpl: processing message
>>>> [67fb99c6-1ca1-4dfb-bffe-43b927fdec07]
>>>>
>>>> 16/06/02 08:32:52 INFO SparkStreamingImpl: processing message
>>>> [de7d5934-bab2-4019-917e-c339d864ba18]
>>>>
>>>> 16/06/02 08:32:53 INFO SparkStreamingImpl: processing message
>>>> [e63d7a7e-de32-4527-b8f1-641cfcc8869c]
>>>>
>>>> 16/06/02 08:32:54 INFO SparkStreamingImpl: processing message
>>>> [1ce931ee-b8b1-4645-8a51-2c697bf1513b]
>>>>
>>>> 16/06/02 08:32:55 INFO SparkStreamingImpl: processing message
>>>> [5367f3c1-d66c-4647-bb44-f5eab719031d]
>>>>
>>>>
>>>
>>
>

Re: how to increase threads per executor

Posted by Jacek Laskowski <ja...@japila.pl>.
--executor-cores 1 to be exact.


Pozdrawiam,
Jacek Laskowski
----
https://medium.com/@jaceklaskowski/
Mastering Apache Spark http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

On Fri, Jun 3, 2016 at 12:28 AM, Mich Talebzadeh <mi...@gmail.com>
wrote:

> interesting. a vm with one core!
>
> one simple test
>
> can you try running with
>
> --executor-cores=1
>
> and see it works ok please
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 2 June 2016 at 23:15, Andres M Jimenez T <ad...@hotmail.com> wrote:
>
>> Mich, thanks for your time,
>>
>>
>> i am launching spark-submit as follows:
>>
>>
>> bin/spark-submit --class com.example.SparkStreamingImpl --master
>> spark://dev1.dev:7077 --verbose --driver-memory 1g --executor-memory 1g
>> --conf "spark.driver.extraJavaOptions=-Dcom.sun.management.jmxremote
>> -Dcom.sun.management.jmxremote.port=8090
>> -Dcom.sun.management.jmxremote.rmi.port=8091
>> -Dcom.sun.management.jmxremote.authenticate=false
>> -Dcom.sun.management.jmxremote.ssl=false" --conf
>> "spark.executor.extraJavaOptions=-Dcom.sun.management.jmxremote
>> -Dcom.sun.management.jmxremote.port=8092
>> -Dcom.sun.management.jmxremote.rmi.port=8093
>> -Dcom.sun.management.jmxremote.authenticate=false
>> -Dcom.sun.management.jmxremote.ssl=false" --conf
>> "spark.scheduler.mode=FAIR" --conf /home/Processing.jar
>>
>>
>> When i use --executor-cores=12 i get "Initial job has not accepted any
>> resources; check your cluster UI to ensure that workers are registered and
>> have sufficient resources".
>>
>>
>> This, because my nodes are single core, but i want to use more than one
>> thread per core, is this possible?
>>
>>
>> root@dev1:/home/spark-1.6.1-bin-hadoop2.6# lscpu
>> Architecture:          x86_64
>> CPU op-mode(s):        32-bit, 64-bit
>> Byte Order:            Little Endian
>> CPU(s):                1
>> On-line CPU(s) list:   0
>> Thread(s) per core:    1
>> Core(s) per socket:    1
>> Socket(s):             1
>> NUMA node(s):          1
>> Vendor ID:             GenuineIntel
>> CPU family:            6
>> Model:                 58
>> Model name:            Intel(R) Xeon(R) CPU E5-2690 v2 @ 3.00GHz
>> Stepping:              0
>> CPU MHz:               2999.999
>> BogoMIPS:              5999.99
>> Hypervisor vendor:     VMware
>> Virtualization type:   full
>> L1d cache:             32K
>> L1i cache:             32K
>> L2 cache:              256K
>> L3 cache:              25600K
>> NUMA node0 CPU(s):     0
>>
>>
>> Thanks
>>
>>
>>
>> ------------------------------
>> *From:* Mich Talebzadeh <mi...@gmail.com>
>> *Sent:* Thursday, June 2, 2016 5:00 PM
>> *To:* Andres M Jimenez T
>> *Cc:* user@spark.apache.org
>> *Subject:* Re: how to increase threads per executor
>>
>> What are passing as parameters to Spark-submit?
>>
>>
>> ${SPARK_HOME}/bin/spark-submit \
>>                 --executor-cores=12 \
>>
>> Also check
>>
>> http://spark.apache.org/docs/latest/configuration.html
>> Configuration - Spark 1.6.1 Documentation
>> <http://spark.apache.org/docs/latest/configuration.html>
>> spark.apache.org
>> Spark Configuration. Spark Properties. Dynamically Loading Spark
>> Properties; Viewing Spark Properties; Available Properties. Application
>> Properties; Runtime Environment
>>
>>
>> Execution Behavior/spark.executor.cores
>>
>>
>> HTH
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 2 June 2016 at 17:29, Andres M Jimenez T <ad...@hotmail.com> wrote:
>>
>>> Hi,
>>>
>>>
>>> I am working with Spark 1.6.1, using kafka direct connect for streaming
>>> data.
>>>
>>> Using spark scheduler and 3 slaves.
>>>
>>> Kafka topic is partitioned with a value of 10.
>>>
>>>
>>> The problem i have is, there is only one thread per executor running my
>>> function (logic implementation).
>>>
>>>
>>> Can anybody tell me how can i increase threads per executor to get
>>> better use of CPUs?
>>>
>>>
>>> Thanks
>>>
>>>
>>> Here is the code i have implemented:
>>>
>>>
>>> *Driver*:
>>>
>>>
>>> JavaStreamingContext ssc = new JavaStreamingContext(conf, new
>>> Duration(10000));
>>>
>>> //prepare streaming from kafka
>>>
>>> Set<String> topicsSet = new
>>> HashSet<>(Arrays.asList("stage1-in,stage1-retry".split(",")));
>>>
>>> Map<String, String> kafkaParams = new HashMap<>();
>>>
>>> kafkaParams.put("metadata.broker.list", kafkaBrokers);
>>>
>>> kafkaParams.put("group.id", SparkStreamingImpl.class.getName());
>>>
>>>
>>> JavaPairInputDStream<String, String> inputMessages =
>>> KafkaUtils.createDirectStream(
>>>
>>> ssc,
>>>
>>> String.class,
>>>
>>> String.class,
>>>
>>> StringDecoder.class,
>>>
>>> StringDecoder.class,
>>>
>>> kafkaParams,
>>>
>>> topicsSet
>>>
>>> );
>>>
>>>
>>> inputMessages.foreachRDD(new ForeachRDDFunction());
>>>
>>>
>>> *ForeachFunction*:
>>>
>>>
>>> class ForeachFunction implements VoidFunction<Tuple2<String, String>> {
>>>
>>> private static final Counter foreachConcurrent =
>>> ProcessingMetrics.metrics.counter( "foreach-concurrency" );
>>>
>>> public ForeachFunction() {
>>>
>>> LOG.info("Creating a new ForeachFunction");
>>>
>>> }
>>>
>>>
>>> public void call(Tuple2<String, String> t) throws Exception {
>>>
>>> foreachConcurrent.inc();
>>>
>>> LOG.info("processing message [" + t._1() + "]");
>>>
>>> try {
>>>
>>> Thread.sleep(1000);
>>>
>>> } catch (Exception e) { }
>>>
>>> foreachConcurrent.dec();
>>>
>>> }
>>>
>>> }
>>>
>>>
>>> *ForeachRDDFunction*:
>>>
>>>
>>> class ForeachRDDFunction implements VoidFunction<JavaPairRDD<String,
>>> String>> {
>>>
>>> private static final Counter foreachRDDConcurrent =
>>> ProcessingMetrics.metrics.counter( "foreachRDD-concurrency" );
>>>
>>> private ForeachFunction foreachFunction = new ForeachFunction();
>>>
>>> public ForeachRDDFunction() {
>>>
>>> LOG.info("Creating a new ForeachRDDFunction");
>>>
>>> }
>>>
>>>
>>> public void call(JavaPairRDD<String, String> t) throws Exception {
>>>
>>> foreachRDDConcurrent.inc();
>>>
>>> LOG.info("call from inputMessages.foreachRDD with [" +
>>> t.partitions().size() + "] partitions");
>>>
>>> for (Partition p : t.partitions()) {
>>>
>>> if (p instanceof KafkaRDDPartition){
>>>
>>> LOG.info("partition [" + p.index() + "] with count [" +
>>> ((KafkaRDDPartition) p).count() + "]");
>>>
>>> }
>>>
>>> }
>>>
>>> t.foreachAsync(foreachFunction);
>>>
>>> foreachRDDConcurrent.dec();
>>>
>>> }
>>>
>>> }
>>>
>>>
>>> *The log from driver that tells me my RDD is partitioned to process in
>>> parallel*:
>>>
>>>
>>> [Stage 70:>  (3 + 3) / 20][Stage 71:>  (0 + 0) / 20][Stage 72:>  (0 + 0)
>>> / 20]16/06/02 08:32:10 INFO SparkStreamingImpl: call from
>>> inputMessages.foreachRDD with [20] partitions
>>>
>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [0] with count [24]
>>>
>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [1] with count [0]
>>>
>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [2] with count [0]
>>>
>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [3] with count [19]
>>>
>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [4] with count [19]
>>>
>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [5] with count [20]
>>>
>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [6] with count [0]
>>>
>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [7] with count [23]
>>>
>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [8] with count [21]
>>>
>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [9] with count [0]
>>>
>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [10] with count [0]
>>>
>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [11] with count [0]
>>>
>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [12] with count [0]
>>>
>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [13] with count [26]
>>>
>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [14] with count [0]
>>>
>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [15] with count [27]
>>>
>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [16] with count [0]
>>>
>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [17] with count [16]
>>>
>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [18] with count [15]
>>>
>>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [19] with count [0]
>>>
>>>
>>> *The log from one of executors showing exactly one message per second
>>> was processed (only by one thread)*:
>>>
>>>
>>> 16/06/02 08:32:46 INFO SparkStreamingImpl: processing message
>>> [f2b22bb9-3bd8-4e5b-b9fb-afa7e8c4deb8]
>>>
>>> 16/06/02 08:32:47 INFO SparkStreamingImpl: processing message
>>> [e267cde2-ffea-4f7a-9934-f32a3b7218cc]
>>>
>>> 16/06/02 08:32:48 INFO SparkStreamingImpl: processing message
>>> [f055fe3c-0f72-4f41-9a31-df544f1e1cd3]
>>>
>>> 16/06/02 08:32:49 INFO SparkStreamingImpl: processing message
>>> [854faaa5-0abe-49a2-b13a-c290a3720b0e]
>>>
>>> 16/06/02 08:32:50 INFO SparkStreamingImpl: processing message
>>> [1bc0a141-b910-45fe-9881-e2066928fbc6]
>>>
>>> 16/06/02 08:32:51 INFO SparkStreamingImpl: processing message
>>> [67fb99c6-1ca1-4dfb-bffe-43b927fdec07]
>>>
>>> 16/06/02 08:32:52 INFO SparkStreamingImpl: processing message
>>> [de7d5934-bab2-4019-917e-c339d864ba18]
>>>
>>> 16/06/02 08:32:53 INFO SparkStreamingImpl: processing message
>>> [e63d7a7e-de32-4527-b8f1-641cfcc8869c]
>>>
>>> 16/06/02 08:32:54 INFO SparkStreamingImpl: processing message
>>> [1ce931ee-b8b1-4645-8a51-2c697bf1513b]
>>>
>>> 16/06/02 08:32:55 INFO SparkStreamingImpl: processing message
>>> [5367f3c1-d66c-4647-bb44-f5eab719031d]
>>>
>>>
>>
>

Re: how to increase threads per executor

Posted by Mich Talebzadeh <mi...@gmail.com>.
interesting. a vm with one core!

one simple test

can you try running with

--executor-cores=1

and see it works ok please



Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 2 June 2016 at 23:15, Andres M Jimenez T <ad...@hotmail.com> wrote:

> Mich, thanks for your time,
>
>
> i am launching spark-submit as follows:
>
>
> bin/spark-submit --class com.example.SparkStreamingImpl --master
> spark://dev1.dev:7077 --verbose --driver-memory 1g --executor-memory 1g
> --conf "spark.driver.extraJavaOptions=-Dcom.sun.management.jmxremote
> -Dcom.sun.management.jmxremote.port=8090
> -Dcom.sun.management.jmxremote.rmi.port=8091
> -Dcom.sun.management.jmxremote.authenticate=false
> -Dcom.sun.management.jmxremote.ssl=false" --conf
> "spark.executor.extraJavaOptions=-Dcom.sun.management.jmxremote
> -Dcom.sun.management.jmxremote.port=8092
> -Dcom.sun.management.jmxremote.rmi.port=8093
> -Dcom.sun.management.jmxremote.authenticate=false
> -Dcom.sun.management.jmxremote.ssl=false" --conf
> "spark.scheduler.mode=FAIR" --conf /home/Processing.jar
>
>
> When i use --executor-cores=12 i get "Initial job has not accepted any
> resources; check your cluster UI to ensure that workers are registered and
> have sufficient resources".
>
>
> This, because my nodes are single core, but i want to use more than one
> thread per core, is this possible?
>
>
> root@dev1:/home/spark-1.6.1-bin-hadoop2.6# lscpu
> Architecture:          x86_64
> CPU op-mode(s):        32-bit, 64-bit
> Byte Order:            Little Endian
> CPU(s):                1
> On-line CPU(s) list:   0
> Thread(s) per core:    1
> Core(s) per socket:    1
> Socket(s):             1
> NUMA node(s):          1
> Vendor ID:             GenuineIntel
> CPU family:            6
> Model:                 58
> Model name:            Intel(R) Xeon(R) CPU E5-2690 v2 @ 3.00GHz
> Stepping:              0
> CPU MHz:               2999.999
> BogoMIPS:              5999.99
> Hypervisor vendor:     VMware
> Virtualization type:   full
> L1d cache:             32K
> L1i cache:             32K
> L2 cache:              256K
> L3 cache:              25600K
> NUMA node0 CPU(s):     0
>
>
> Thanks
>
>
>
> ------------------------------
> *From:* Mich Talebzadeh <mi...@gmail.com>
> *Sent:* Thursday, June 2, 2016 5:00 PM
> *To:* Andres M Jimenez T
> *Cc:* user@spark.apache.org
> *Subject:* Re: how to increase threads per executor
>
> What are passing as parameters to Spark-submit?
>
>
> ${SPARK_HOME}/bin/spark-submit \
>                 --executor-cores=12 \
>
> Also check
>
> http://spark.apache.org/docs/latest/configuration.html
> Configuration - Spark 1.6.1 Documentation
> <http://spark.apache.org/docs/latest/configuration.html>
> spark.apache.org
> Spark Configuration. Spark Properties. Dynamically Loading Spark
> Properties; Viewing Spark Properties; Available Properties. Application
> Properties; Runtime Environment
>
>
> Execution Behavior/spark.executor.cores
>
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 2 June 2016 at 17:29, Andres M Jimenez T <ad...@hotmail.com> wrote:
>
>> Hi,
>>
>>
>> I am working with Spark 1.6.1, using kafka direct connect for streaming
>> data.
>>
>> Using spark scheduler and 3 slaves.
>>
>> Kafka topic is partitioned with a value of 10.
>>
>>
>> The problem i have is, there is only one thread per executor running my
>> function (logic implementation).
>>
>>
>> Can anybody tell me how can i increase threads per executor to get better
>> use of CPUs?
>>
>>
>> Thanks
>>
>>
>> Here is the code i have implemented:
>>
>>
>> *Driver*:
>>
>>
>> JavaStreamingContext ssc = new JavaStreamingContext(conf, new
>> Duration(10000));
>>
>> //prepare streaming from kafka
>>
>> Set<String> topicsSet = new
>> HashSet<>(Arrays.asList("stage1-in,stage1-retry".split(",")));
>>
>> Map<String, String> kafkaParams = new HashMap<>();
>>
>> kafkaParams.put("metadata.broker.list", kafkaBrokers);
>>
>> kafkaParams.put("group.id", SparkStreamingImpl.class.getName());
>>
>>
>> JavaPairInputDStream<String, String> inputMessages =
>> KafkaUtils.createDirectStream(
>>
>> ssc,
>>
>> String.class,
>>
>> String.class,
>>
>> StringDecoder.class,
>>
>> StringDecoder.class,
>>
>> kafkaParams,
>>
>> topicsSet
>>
>> );
>>
>>
>> inputMessages.foreachRDD(new ForeachRDDFunction());
>>
>>
>> *ForeachFunction*:
>>
>>
>> class ForeachFunction implements VoidFunction<Tuple2<String, String>> {
>>
>> private static final Counter foreachConcurrent =
>> ProcessingMetrics.metrics.counter( "foreach-concurrency" );
>>
>> public ForeachFunction() {
>>
>> LOG.info("Creating a new ForeachFunction");
>>
>> }
>>
>>
>> public void call(Tuple2<String, String> t) throws Exception {
>>
>> foreachConcurrent.inc();
>>
>> LOG.info("processing message [" + t._1() + "]");
>>
>> try {
>>
>> Thread.sleep(1000);
>>
>> } catch (Exception e) { }
>>
>> foreachConcurrent.dec();
>>
>> }
>>
>> }
>>
>>
>> *ForeachRDDFunction*:
>>
>>
>> class ForeachRDDFunction implements VoidFunction<JavaPairRDD<String,
>> String>> {
>>
>> private static final Counter foreachRDDConcurrent =
>> ProcessingMetrics.metrics.counter( "foreachRDD-concurrency" );
>>
>> private ForeachFunction foreachFunction = new ForeachFunction();
>>
>> public ForeachRDDFunction() {
>>
>> LOG.info("Creating a new ForeachRDDFunction");
>>
>> }
>>
>>
>> public void call(JavaPairRDD<String, String> t) throws Exception {
>>
>> foreachRDDConcurrent.inc();
>>
>> LOG.info("call from inputMessages.foreachRDD with [" +
>> t.partitions().size() + "] partitions");
>>
>> for (Partition p : t.partitions()) {
>>
>> if (p instanceof KafkaRDDPartition){
>>
>> LOG.info("partition [" + p.index() + "] with count [" +
>> ((KafkaRDDPartition) p).count() + "]");
>>
>> }
>>
>> }
>>
>> t.foreachAsync(foreachFunction);
>>
>> foreachRDDConcurrent.dec();
>>
>> }
>>
>> }
>>
>>
>> *The log from driver that tells me my RDD is partitioned to process in
>> parallel*:
>>
>>
>> [Stage 70:>  (3 + 3) / 20][Stage 71:>  (0 + 0) / 20][Stage 72:>  (0 + 0)
>> / 20]16/06/02 08:32:10 INFO SparkStreamingImpl: call from
>> inputMessages.foreachRDD with [20] partitions
>>
>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [0] with count [24]
>>
>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [1] with count [0]
>>
>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [2] with count [0]
>>
>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [3] with count [19]
>>
>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [4] with count [19]
>>
>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [5] with count [20]
>>
>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [6] with count [0]
>>
>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [7] with count [23]
>>
>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [8] with count [21]
>>
>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [9] with count [0]
>>
>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [10] with count [0]
>>
>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [11] with count [0]
>>
>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [12] with count [0]
>>
>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [13] with count [26]
>>
>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [14] with count [0]
>>
>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [15] with count [27]
>>
>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [16] with count [0]
>>
>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [17] with count [16]
>>
>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [18] with count [15]
>>
>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [19] with count [0]
>>
>>
>> *The log from one of executors showing exactly one message per second was
>> processed (only by one thread)*:
>>
>>
>> 16/06/02 08:32:46 INFO SparkStreamingImpl: processing message
>> [f2b22bb9-3bd8-4e5b-b9fb-afa7e8c4deb8]
>>
>> 16/06/02 08:32:47 INFO SparkStreamingImpl: processing message
>> [e267cde2-ffea-4f7a-9934-f32a3b7218cc]
>>
>> 16/06/02 08:32:48 INFO SparkStreamingImpl: processing message
>> [f055fe3c-0f72-4f41-9a31-df544f1e1cd3]
>>
>> 16/06/02 08:32:49 INFO SparkStreamingImpl: processing message
>> [854faaa5-0abe-49a2-b13a-c290a3720b0e]
>>
>> 16/06/02 08:32:50 INFO SparkStreamingImpl: processing message
>> [1bc0a141-b910-45fe-9881-e2066928fbc6]
>>
>> 16/06/02 08:32:51 INFO SparkStreamingImpl: processing message
>> [67fb99c6-1ca1-4dfb-bffe-43b927fdec07]
>>
>> 16/06/02 08:32:52 INFO SparkStreamingImpl: processing message
>> [de7d5934-bab2-4019-917e-c339d864ba18]
>>
>> 16/06/02 08:32:53 INFO SparkStreamingImpl: processing message
>> [e63d7a7e-de32-4527-b8f1-641cfcc8869c]
>>
>> 16/06/02 08:32:54 INFO SparkStreamingImpl: processing message
>> [1ce931ee-b8b1-4645-8a51-2c697bf1513b]
>>
>> 16/06/02 08:32:55 INFO SparkStreamingImpl: processing message
>> [5367f3c1-d66c-4647-bb44-f5eab719031d]
>>
>>
>

Re: how to increase threads per executor

Posted by Andres M Jimenez T <ad...@hotmail.com>.
Mich, thanks for your time,


i am launching spark-submit as follows:


bin/spark-submit --class com.example.SparkStreamingImpl --master spark://dev1.dev:7077 --verbose --driver-memory 1g --executor-memory 1g --conf "spark.driver.extraJavaOptions=-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=8090 -Dcom.sun.management.jmxremote.rmi.port=8091 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false" --conf "spark.executor.extraJavaOptions=-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=8092 -Dcom.sun.management.jmxremote.rmi.port=8093 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false" --conf "spark.scheduler.mode=FAIR" --conf /home/Processing.jar


When i use --executor-cores=12 i get "Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources".


This, because my nodes are single core, but i want to use more than one thread per core, is this possible?


root@dev1:/home/spark-1.6.1-bin-hadoop2.6# lscpu
Architecture:          x86_64
CPU op-mode(s):        32-bit, 64-bit
Byte Order:            Little Endian
CPU(s):                1
On-line CPU(s) list:   0
Thread(s) per core:    1
Core(s) per socket:    1
Socket(s):             1
NUMA node(s):          1
Vendor ID:             GenuineIntel
CPU family:            6
Model:                 58
Model name:            Intel(R) Xeon(R) CPU E5-2690 v2 @ 3.00GHz
Stepping:              0
CPU MHz:               2999.999
BogoMIPS:              5999.99
Hypervisor vendor:     VMware
Virtualization type:   full
L1d cache:             32K
L1i cache:             32K
L2 cache:              256K
L3 cache:              25600K
NUMA node0 CPU(s):     0



Thanks



________________________________
From: Mich Talebzadeh <mi...@gmail.com>
Sent: Thursday, June 2, 2016 5:00 PM
To: Andres M Jimenez T
Cc: user@spark.apache.org
Subject: Re: how to increase threads per executor

What are passing as parameters to Spark-submit?


${SPARK_HOME}/bin/spark-submit \
                --executor-cores=12 \

Also check

http://spark.apache.org/docs/latest/configuration.html
Configuration - Spark 1.6.1 Documentation<http://spark.apache.org/docs/latest/configuration.html>
spark.apache.org
Spark Configuration. Spark Properties. Dynamically Loading Spark Properties; Viewing Spark Properties; Available Properties. Application Properties; Runtime Environment



Execution Behavior/spark.executor.cores


HTH



Dr Mich Talebzadeh



LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com<http://talebzadehmich.wordpress.com/>



On 2 June 2016 at 17:29, Andres M Jimenez T <ad...@hotmail.com>> wrote:

Hi,


I am working with Spark 1.6.1, using kafka direct connect for streaming data.

Using spark scheduler and 3 slaves.

Kafka topic is partitioned with a value of 10.


The problem i have is, there is only one thread per executor running my function (logic implementation).


Can anybody tell me how can i increase threads per executor to get better use of CPUs?


Thanks


Here is the code i have implemented:


Driver:


JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(10000));

//prepare streaming from kafka

Set<String> topicsSet = new HashSet<>(Arrays.asList("stage1-in,stage1-retry".split(",")));

Map<String, String> kafkaParams = new HashMap<>();

kafkaParams.put("metadata.broker.list", kafkaBrokers);

kafkaParams.put("group.id<http://group.id>", SparkStreamingImpl.class.getName());


JavaPairInputDStream<String, String> inputMessages = KafkaUtils.createDirectStream(

ssc,

String.class,

String.class,

StringDecoder.class,

StringDecoder.class,

kafkaParams,

topicsSet

);


inputMessages.foreachRDD(new ForeachRDDFunction());


ForeachFunction:


class ForeachFunction implements VoidFunction<Tuple2<String, String>> {

private static final Counter foreachConcurrent = ProcessingMetrics.metrics.counter( "foreach-concurrency" );

public ForeachFunction() {

LOG.info("Creating a new ForeachFunction");

}


public void call(Tuple2<String, String> t) throws Exception {

foreachConcurrent.inc();

LOG.info("processing message [" + t._1() + "]");

try {

Thread.sleep(1000);

} catch (Exception e) { }

foreachConcurrent.dec();

}

}


ForeachRDDFunction:


class ForeachRDDFunction implements VoidFunction<JavaPairRDD<String, String>> {

private static final Counter foreachRDDConcurrent = ProcessingMetrics.metrics.counter( "foreachRDD-concurrency" );

private ForeachFunction foreachFunction = new ForeachFunction();

public ForeachRDDFunction() {

LOG.info("Creating a new ForeachRDDFunction");

}


public void call(JavaPairRDD<String, String> t) throws Exception {

foreachRDDConcurrent.inc();

LOG.info("call from inputMessages.foreachRDD with [" + t.partitions().size() + "] partitions");

for (Partition p : t.partitions()) {

if (p instanceof KafkaRDDPartition){

LOG.info("partition [" + p.index() + "] with count [" + ((KafkaRDDPartition) p).count() + "]");

}

}

t.foreachAsync(foreachFunction);

foreachRDDConcurrent.dec();

}

}


The log from driver that tells me my RDD is partitioned to process in parallel:


[Stage 70:>  (3 + 3) / 20][Stage 71:>  (0 + 0) / 20][Stage 72:>  (0 + 0) / 20]16/06/02 08:32:10 INFO SparkStreamingImpl: call from inputMessages.foreachRDD with [20] partitions

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [0] with count [24]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [1] with count [0]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [2] with count [0]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [3] with count [19]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [4] with count [19]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [5] with count [20]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [6] with count [0]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [7] with count [23]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [8] with count [21]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [9] with count [0]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [10] with count [0]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [11] with count [0]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [12] with count [0]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [13] with count [26]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [14] with count [0]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [15] with count [27]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [16] with count [0]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [17] with count [16]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [18] with count [15]

16/06/02 08:32:10 INFO SparkStreamingImpl: partition [19] with count [0]


The log from one of executors showing exactly one message per second was processed (only by one thread):


16/06/02 08:32:46 INFO SparkStreamingImpl: processing message [f2b22bb9-3bd8-4e5b-b9fb-afa7e8c4deb8]

16/06/02 08:32:47 INFO SparkStreamingImpl: processing message [e267cde2-ffea-4f7a-9934-f32a3b7218cc]

16/06/02 08:32:48 INFO SparkStreamingImpl: processing message [f055fe3c-0f72-4f41-9a31-df544f1e1cd3]

16/06/02 08:32:49 INFO SparkStreamingImpl: processing message [854faaa5-0abe-49a2-b13a-c290a3720b0e]

16/06/02 08:32:50 INFO SparkStreamingImpl: processing message [1bc0a141-b910-45fe-9881-e2066928fbc6]

16/06/02 08:32:51 INFO SparkStreamingImpl: processing message [67fb99c6-1ca1-4dfb-bffe-43b927fdec07]

16/06/02 08:32:52 INFO SparkStreamingImpl: processing message [de7d5934-bab2-4019-917e-c339d864ba18]

16/06/02 08:32:53 INFO SparkStreamingImpl: processing message [e63d7a7e-de32-4527-b8f1-641cfcc8869c]

16/06/02 08:32:54 INFO SparkStreamingImpl: processing message [1ce931ee-b8b1-4645-8a51-2c697bf1513b]

16/06/02 08:32:55 INFO SparkStreamingImpl: processing message [5367f3c1-d66c-4647-bb44-f5eab719031d]



Re: how to increase threads per executor

Posted by Mich Talebzadeh <mi...@gmail.com>.
What are passing as parameters to Spark-submit?


${SPARK_HOME}/bin/spark-submit \
                --executor-cores=12 \

Also check

http://spark.apache.org/docs/latest/configuration.html

Execution Behavior/spark.executor.cores


HTH


Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 2 June 2016 at 17:29, Andres M Jimenez T <ad...@hotmail.com> wrote:

> Hi,
>
>
> I am working with Spark 1.6.1, using kafka direct connect for streaming
> data.
>
> Using spark scheduler and 3 slaves.
>
> Kafka topic is partitioned with a value of 10.
>
>
> The problem i have is, there is only one thread per executor running my
> function (logic implementation).
>
>
> Can anybody tell me how can i increase threads per executor to get better
> use of CPUs?
>
>
> Thanks
>
>
> Here is the code i have implemented:
>
>
> *Driver*:
>
>
> JavaStreamingContext ssc = new JavaStreamingContext(conf, new
> Duration(10000));
>
> //prepare streaming from kafka
>
> Set<String> topicsSet = new
> HashSet<>(Arrays.asList("stage1-in,stage1-retry".split(",")));
>
> Map<String, String> kafkaParams = new HashMap<>();
>
> kafkaParams.put("metadata.broker.list", kafkaBrokers);
>
> kafkaParams.put("group.id", SparkStreamingImpl.class.getName());
>
>
> JavaPairInputDStream<String, String> inputMessages =
> KafkaUtils.createDirectStream(
>
> ssc,
>
> String.class,
>
> String.class,
>
> StringDecoder.class,
>
> StringDecoder.class,
>
> kafkaParams,
>
> topicsSet
>
> );
>
>
> inputMessages.foreachRDD(new ForeachRDDFunction());
>
>
> *ForeachFunction*:
>
>
> class ForeachFunction implements VoidFunction<Tuple2<String, String>> {
>
> private static final Counter foreachConcurrent =
> ProcessingMetrics.metrics.counter( "foreach-concurrency" );
>
> public ForeachFunction() {
>
> LOG.info("Creating a new ForeachFunction");
>
> }
>
>
> public void call(Tuple2<String, String> t) throws Exception {
>
> foreachConcurrent.inc();
>
> LOG.info("processing message [" + t._1() + "]");
>
> try {
>
> Thread.sleep(1000);
>
> } catch (Exception e) { }
>
> foreachConcurrent.dec();
>
> }
>
> }
>
>
> *ForeachRDDFunction*:
>
>
> class ForeachRDDFunction implements VoidFunction<JavaPairRDD<String,
> String>> {
>
> private static final Counter foreachRDDConcurrent =
> ProcessingMetrics.metrics.counter( "foreachRDD-concurrency" );
>
> private ForeachFunction foreachFunction = new ForeachFunction();
>
> public ForeachRDDFunction() {
>
> LOG.info("Creating a new ForeachRDDFunction");
>
> }
>
>
> public void call(JavaPairRDD<String, String> t) throws Exception {
>
> foreachRDDConcurrent.inc();
>
> LOG.info("call from inputMessages.foreachRDD with [" +
> t.partitions().size() + "] partitions");
>
> for (Partition p : t.partitions()) {
>
> if (p instanceof KafkaRDDPartition){
>
> LOG.info("partition [" + p.index() + "] with count [" +
> ((KafkaRDDPartition) p).count() + "]");
>
> }
>
> }
>
> t.foreachAsync(foreachFunction);
>
> foreachRDDConcurrent.dec();
>
> }
>
> }
>
>
> *The log from driver that tells me my RDD is partitioned to process in
> parallel*:
>
>
> [Stage 70:>  (3 + 3) / 20][Stage 71:>  (0 + 0) / 20][Stage 72:>  (0 + 0) /
> 20]16/06/02 08:32:10 INFO SparkStreamingImpl: call from
> inputMessages.foreachRDD with [20] partitions
>
> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [0] with count [24]
>
> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [1] with count [0]
>
> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [2] with count [0]
>
> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [3] with count [19]
>
> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [4] with count [19]
>
> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [5] with count [20]
>
> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [6] with count [0]
>
> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [7] with count [23]
>
> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [8] with count [21]
>
> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [9] with count [0]
>
> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [10] with count [0]
>
> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [11] with count [0]
>
> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [12] with count [0]
>
> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [13] with count [26]
>
> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [14] with count [0]
>
> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [15] with count [27]
>
> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [16] with count [0]
>
> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [17] with count [16]
>
> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [18] with count [15]
>
> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [19] with count [0]
>
>
> *The log from one of executors showing exactly one message per second was
> processed (only by one thread)*:
>
>
> 16/06/02 08:32:46 INFO SparkStreamingImpl: processing message
> [f2b22bb9-3bd8-4e5b-b9fb-afa7e8c4deb8]
>
> 16/06/02 08:32:47 INFO SparkStreamingImpl: processing message
> [e267cde2-ffea-4f7a-9934-f32a3b7218cc]
>
> 16/06/02 08:32:48 INFO SparkStreamingImpl: processing message
> [f055fe3c-0f72-4f41-9a31-df544f1e1cd3]
>
> 16/06/02 08:32:49 INFO SparkStreamingImpl: processing message
> [854faaa5-0abe-49a2-b13a-c290a3720b0e]
>
> 16/06/02 08:32:50 INFO SparkStreamingImpl: processing message
> [1bc0a141-b910-45fe-9881-e2066928fbc6]
>
> 16/06/02 08:32:51 INFO SparkStreamingImpl: processing message
> [67fb99c6-1ca1-4dfb-bffe-43b927fdec07]
>
> 16/06/02 08:32:52 INFO SparkStreamingImpl: processing message
> [de7d5934-bab2-4019-917e-c339d864ba18]
>
> 16/06/02 08:32:53 INFO SparkStreamingImpl: processing message
> [e63d7a7e-de32-4527-b8f1-641cfcc8869c]
>
> 16/06/02 08:32:54 INFO SparkStreamingImpl: processing message
> [1ce931ee-b8b1-4645-8a51-2c697bf1513b]
>
> 16/06/02 08:32:55 INFO SparkStreamingImpl: processing message
> [5367f3c1-d66c-4647-bb44-f5eab719031d]
>
>