You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Jakub Stransky <st...@gmail.com> on 2016/07/14 16:18:49 UTC

Standalone cluster node utilization

Hello,

I have a spark  cluster running in a single mode, master + 6 executors.

My application is reading a data from database via DataFrame.read then
there is a filtering of rows. After that I re-partition data and I wonder
why on the executors page of the driver UI I see RDD blocks all allocated
still on single executor machine

[image: Inline images 1]
As highlighted on the picture above. I did expect that after re-partition
the data will be shuffled across cluster but that is obviously not
happening here.

I can understand that database read is happening in non-parallel fashion
but re-partition  should fix it as far as I understand.

Could someone experienced clarify that?

Thanks

Re: Standalone cluster node utilization

Posted by "Zhou (Joe) Xing" <jo...@nextev.com>.
i have seen similar behavior in my standalone cluster, I tried to increase the number of partitions and at some point it seems all the executors or worker nodes start to make parallel connection to remote data store. But it would be nice if someone could point us to some references on how to make proper use of the repartition of data from a remote data store read by spark SQL, thanks a lot

zhou




> On Jul 14, 2016, at 9:18 AM, Jakub Stransky <st...@gmail.com> wrote:
> 
> <image.png>


---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Standalone cluster node utilization

Posted by Jakub Stransky <st...@gmail.com>.
I witness really weird behavior when loading the data from RDBMS.

I tried different approach for loading the data - I provided a partitioning
column for make partitioning parallelism:

    val df_init = sqlContext.read.format("jdbc").options(
      Map("url" -> Configuration.dbUrl,
        "dbtable" -> Configuration.dbTable,
        "driver" -> Configuration.dbDriver,
        "partitionColumn"-> "Target",
        "lowerBound" -> "30000",
        "upperBound" -> "90000",
        "numPartitions" -> Configuration.appPartitioning.toString
      )).load()


But what I get when I check storage tab on the UI is following distribution:

Data Distribution on 7 Executors
HostMemory UsageDisk Usage
spark1.clust:56209 145.3 MB (16.1 GB Remaining) 0.0 B
10.2.0.4:50305 0.0 B (37.2 GB Remaining) 0.0 B
spark5.clust:41822 112.0 B (16.9 GB Remaining) 0.0 B
spark4.clust:56687 112.0 B (16.9 GB Remaining) 0.0 B
spark3.clust:34263 0.0 B (16.9 GB Remaining) 0.0 B
spark2.clust:43663 112.0 B (16.9 GB Remaining) 0.0 B
spark0.clust:57445 112.0 B (16.9 GB Remaining) 0.0 B

Almost all the data resides on one node, the rest is negligible. Any idea
what might be wrong with this setting? I admit that partitioning field is
not uniformly distributed but

Latter on during the computation I try to repartition data frames but the
effect is that data get collected to one node.

    val df_init = sqlContext.read.format("jdbc").options(
      Map("url" -> Configuration.dbUrl,
        "dbtable" -> Configuration.dbTable,
        "driver" -> Configuration.dbDriver,
        "partitionColumn"-> "Target",
        "lowerBound" -> "30000",
        "upperBound" -> "90000",
//        "numPartitions" -> Configuration.appPartitioning.toString
        "numPartitions" -> "35"
      )).load()

    df_init.cache()

    df_init.registerTempTable("df_init")

    val df = (if (Configuration.dataSubset) {
      val (loadingCondition, TypeId) = if (args.length > 1) {
        (args(1), args(2))
      }
      else
        (Configuration.dataCondition, Configuration.dataType)

      sqlContext.sql(
        s"""SELECT  statmement ... Condition = '$Condition'""".stripMargin)
    } else {
      df_init
    }).repartition(Configuration.appPartitioning)

df.persist()

Seems that none of those actually work as expected. It seems that I cannot
distribute the data across the cluster. Could someone more experienced
provide some hints whot might be wrong?

Thanks










On 14 July 2016 at 19:31, Jakub Stransky <st...@gmail.com> wrote:

> HI Talebzadeh,
>
> sorry I forget to answer last part of your question:
>
> At O/S level you should see many CoarseGrainedExecutorBackend through jps
> each corresponding to one executor. Are they doing anything?
>
> There is one worker with one executor bussy and the rest is almost idle:
>
>   PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND
>  9305 spark     20   0 30.489g 5.075g  22256 S  * 0.3 18.5*   0:36.25 java
>
> The only one - bussy one is running all 8cores machine
>
>   PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND
>  9580 zdata     20   0 29.664g 0.021t   6836 S* 676.7 79.4*  40:08.61 java
>
>
> Thanks
> Jakub
>
> On 14 July 2016 at 19:22, Jakub Stransky <st...@gmail.com> wrote:
>
>> HI Talebzadeh,
>>
>> we are using 6 worker machines - running.
>>
>> We are reading the data through sqlContext (data frame) as it is
>> suggested in the documentation over the JdbcRdd
>>
>> prop just specifies name, password, and driver class.
>>
>> Right after this data load we register it as a temp table
>>
>>     val df_init = sqlContext.read
>>       .jdbc(
>>         url = Configuration.dbUrl,
>>         table = Configuration.dbTable,
>>         prop
>>       )
>>
>>     df_init.registerTempTable("df_init")
>>
>> Afterwords we do some data filtering, column selection and filtering some
>> rows with sqlContext.sql ("select statement here")
>>
>> and after this selection we try to repartition the data in order to get
>> them distributed across the cluster and that seems it is not working. And
>> then we persist that filtered and selected dataFrame.
>>
>> And the desired state should be filtered dataframe should be distributed
>> accross the nodes in the cluster.
>>
>> Jakub
>>
>>
>>
>> On 14 July 2016 at 19:03, Mich Talebzadeh <mi...@gmail.com>
>> wrote:
>>
>>> Hi Jakub,
>>>
>>> Sounds like one executor. Can you point out:
>>>
>>>
>>>    1. The number of slaves/workers you are running
>>>    2. Are you using JDBC to read data in?
>>>    3. Do you register DF as temp table and if so have you cached temp
>>>    table
>>>
>>> Sounds like only one executor is active and the rest are sitting idele.
>>>
>>> At O/S level you should see many CoarseGrainedExecutorBackend through
>>> jps each corresponding to one executor. Are they doing anything?
>>>
>>> HTH
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> On 14 July 2016 at 17:18, Jakub Stransky <st...@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> I have a spark  cluster running in a single mode, master + 6 executors.
>>>>
>>>> My application is reading a data from database via DataFrame.read then
>>>> there is a filtering of rows. After that I re-partition data and I wonder
>>>> why on the executors page of the driver UI I see RDD blocks all allocated
>>>> still on single executor machine
>>>>
>>>> [image: Inline images 1]
>>>> As highlighted on the picture above. I did expect that after
>>>> re-partition the data will be shuffled across cluster but that is obviously
>>>> not happening here.
>>>>
>>>> I can understand that database read is happening in non-parallel
>>>> fashion but re-partition  should fix it as far as I understand.
>>>>
>>>> Could someone experienced clarify that?
>>>>
>>>> Thanks
>>>>
>>>
>>>
>>
>>
>> --
>> Jakub Stransky
>> cz.linkedin.com/in/jakubstransky
>>
>>
>
>
> --
> Jakub Stransky
> cz.linkedin.com/in/jakubstransky
>
>


-- 
Jakub Stransky
cz.linkedin.com/in/jakubstransky

Re: Standalone cluster node utilization

Posted by Mich Talebzadeh <mi...@gmail.com>.
With standalone cluster you have one driver on edge (where you are running
spark-submit) and multiple executors each on different nodes assuming you
have started your slaves with enough workers.

One test would be if you try another code line to see if process is
parallelised with each executor running the same code on a different subset
of data.

Or try to do away with temp table and do it through functional programming?

val s =
HiveContext.table("sales").select("AMOUNT_SOLD","TIME_ID","CHANNEL_ID")
val c = HiveContext.table("channels").select("CHANNEL_ID","CHANNEL_DESC")
val t = HiveContext.table("times").select("TIME_ID","CALENDAR_MONTH_DESC")
val rs = s.join(t,s("time_id")===t("time_id"),
"fullouter").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))

See whether that uses all executors?

HTH

Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 15 July 2016 at 06:28, Jakub Stransky <st...@gmail.com> wrote:

> Hi Mich,
>
> sorry that was probably a typo or I messed it up.  I am running spark
> cluster in standalone mode with master + 6 worker machines. Each worker
> machine has one executor on it.
> Hope that now clarifies, so it is not one machine with six executors.
>
> The problem is that when I try to read dataset it is get loaded just by
> one executor holding all the blocks which effectively means that data are
> not distributed.
>
> The way how we read the data is
>     val df_init = sqlContext.read
>       .jdbc(
>         url = Configuration.dbUrl,
>         table = Configuration.dbTable,
>         prop
>       )
>
>     df_init.registerTempTable("df_init")
>
> That is essentially one issue - not this distribution of the data.
>
> Next strange behavior comes after. Lets state it as a fact that all blocks
> are located on one node so after the column selection and filtering we try
> repartition data in hope that this will cause to redistribute the data. But
> that doesn't happen either - all the RDD blocks still reside on the source
> server where the data get loaded. Our code is following:
>
>     val df_init = sqlContext.read
>       .jdbc(
>         url = Configuration.dbUrl,
>         table = Configuration.dbTable,
>         prop
>       )
>
>     df_init.registerTempTable("df_init")
>
>     val df = (if (Configuration.dataSubset) {
>       val (loadingCondition, TypeId) = if (args.length > 1) {
>         (args(1), args(2))
>       }
>       else
>         (Configuration.dataCondition, Configuration.dataType)
>
>       sqlContext.sql(
>         s"""SELECT  statmement ... Condition = '$Condition'""".stripMargin)
>     } else {
>       df_init
>     }).*repartition*(Configuration.appPartitioning)
>
> df.persist()
>
>  We would expect that df should be spread across the nodes but that is not
> the case and all data are still located on one server. I am not sure if
> that is somehow related to temTable usage or ... I was convinced that
> repartition will cause reshuffle accross the nodes but that is not true in
> this case. We are using number of partition 96 as we have 6 machines  with
> 8 cores.
>
> Anybody see any issue here?
>
> Thanks
> Jakub
>
>
> On 14 July 2016 at 23:07, Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
>> Thanks for details
>>
>> You mentioned
>>
>> "I have a spark  cluster running in a single mode, master + 6 executors."
>>
>> Do you mean running in a single NODE?
>>
>> JDBC read
>>
>> This one reads from Oracle table
>>
>> val c = HiveContext.load("jdbc",
>> Map("url" -> _ORACLEserver,
>> "dbtable" -> "(SELECT to_char(CHANNEL_ID) AS CHANNEL_ID, CHANNEL_DESC
>> FROM sh.channels)",
>> "user" -> _username_ORACLE,
>> "password" -> _password_ORACLE))
>>
>> c.registerTempTable("t_c")
>>
>> Then basically you run some SQL on the temp table df_ini. BTW personally
>> to avoid ambiguity I would call temp table different from its df something
>> like t_fd_init (just a suggestion)
>>
>> If you are running a single node then what is the evidence that all those
>> executors are needed. Have you tried caching the temp table before running
>> SQL?
>>
>> //Cache the table
>> HiveContext.cacheTable("df_ini")
>>
>> If you are running sql on temp table then you are just creating multiple
>> smaller result sets and logically depending on the size of the result set,
>> there may not be a need for much parallel processing. A serial scan may be
>> sufficient.
>>
>> Although with my tables cached I see all workers are doing something
>>
>> [image: Inline images 2]
>>
>> So really difficult to answer
>>
>> HTH
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 14 July 2016 at 18:31, Jakub Stransky <st...@gmail.com> wrote:
>>
>>> HI Talebzadeh,
>>>
>>> sorry I forget to answer last part of your question:
>>>
>>> At O/S level you should see many CoarseGrainedExecutorBackend through
>>> jps each corresponding to one executor. Are they doing anything?
>>>
>>> There is one worker with one executor bussy and the rest is almost idle:
>>>
>>>   PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+
>>> COMMAND
>>>  9305 spark     20   0 30.489g 5.075g  22256 S  * 0.3 18.5*   0:36.25
>>> java
>>>
>>> The only one - bussy one is running all 8cores machine
>>>
>>>   PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+
>>> COMMAND
>>>  9580 zdata     20   0 29.664g 0.021t   6836 S* 676.7 79.4*  40:08.61
>>> java
>>>
>>>
>>> Thanks
>>> Jakub
>>>
>>> On 14 July 2016 at 19:22, Jakub Stransky <st...@gmail.com> wrote:
>>>
>>>> HI Talebzadeh,
>>>>
>>>> we are using 6 worker machines - running.
>>>>
>>>> We are reading the data through sqlContext (data frame) as it is
>>>> suggested in the documentation over the JdbcRdd
>>>>
>>>> prop just specifies name, password, and driver class.
>>>>
>>>> Right after this data load we register it as a temp table
>>>>
>>>>     val df_init = sqlContext.read
>>>>       .jdbc(
>>>>         url = Configuration.dbUrl,
>>>>         table = Configuration.dbTable,
>>>>         prop
>>>>       )
>>>>
>>>>     df_init.registerTempTable("df_init")
>>>>
>>>> Afterwords we do some data filtering, column selection and filtering
>>>> some rows with sqlContext.sql ("select statement here")
>>>>
>>>> and after this selection we try to repartition the data in order to get
>>>> them distributed across the cluster and that seems it is not working. And
>>>> then we persist that filtered and selected dataFrame.
>>>>
>>>> And the desired state should be filtered dataframe should be
>>>> distributed accross the nodes in the cluster.
>>>>
>>>> Jakub
>>>>
>>>>
>>>>
>>>> On 14 July 2016 at 19:03, Mich Talebzadeh <mi...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Jakub,
>>>>>
>>>>> Sounds like one executor. Can you point out:
>>>>>
>>>>>
>>>>>    1. The number of slaves/workers you are running
>>>>>    2. Are you using JDBC to read data in?
>>>>>    3. Do you register DF as temp table and if so have you cached temp
>>>>>    table
>>>>>
>>>>> Sounds like only one executor is active and the rest are sitting idele.
>>>>>
>>>>> At O/S level you should see many CoarseGrainedExecutorBackend through
>>>>> jps each corresponding to one executor. Are they doing anything?
>>>>>
>>>>> HTH
>>>>>
>>>>> Dr Mich Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>
>>>>>
>>>>>
>>>>> http://talebzadehmich.wordpress.com
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>> On 14 July 2016 at 17:18, Jakub Stransky <st...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I have a spark  cluster running in a single mode, master + 6
>>>>>> executors.
>>>>>>
>>>>>> My application is reading a data from database via DataFrame.read
>>>>>> then there is a filtering of rows. After that I re-partition data and I
>>>>>> wonder why on the executors page of the driver UI I see RDD blocks all
>>>>>> allocated still on single executor machine
>>>>>>
>>>>>> [image: Inline images 1]
>>>>>> As highlighted on the picture above. I did expect that after
>>>>>> re-partition the data will be shuffled across cluster but that is obviously
>>>>>> not happening here.
>>>>>>
>>>>>> I can understand that database read is happening in non-parallel
>>>>>> fashion but re-partition  should fix it as far as I understand.
>>>>>>
>>>>>> Could someone experienced clarify that?
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Jakub Stransky
>>>> cz.linkedin.com/in/jakubstransky
>>>>
>>>>
>>>
>>>
>>> --
>>> Jakub Stransky
>>> cz.linkedin.com/in/jakubstransky
>>>
>>>
>>
>
>
> --
> Jakub Stransky
> cz.linkedin.com/in/jakubstransky
>
>

Re: Standalone cluster node utilization

Posted by Jakub Stransky <st...@gmail.com>.
Hi Mich,

sorry that was probably a typo or I messed it up.  I am running spark
cluster in standalone mode with master + 6 worker machines. Each worker
machine has one executor on it.
Hope that now clarifies, so it is not one machine with six executors.

The problem is that when I try to read dataset it is get loaded just by one
executor holding all the blocks which effectively means that data are not
distributed.

The way how we read the data is
    val df_init = sqlContext.read
      .jdbc(
        url = Configuration.dbUrl,
        table = Configuration.dbTable,
        prop
      )

    df_init.registerTempTable("df_init")

That is essentially one issue - not this distribution of the data.

Next strange behavior comes after. Lets state it as a fact that all blocks
are located on one node so after the column selection and filtering we try
repartition data in hope that this will cause to redistribute the data. But
that doesn't happen either - all the RDD blocks still reside on the source
server where the data get loaded. Our code is following:

    val df_init = sqlContext.read
      .jdbc(
        url = Configuration.dbUrl,
        table = Configuration.dbTable,
        prop
      )

    df_init.registerTempTable("df_init")

    val df = (if (Configuration.dataSubset) {
      val (loadingCondition, TypeId) = if (args.length > 1) {
        (args(1), args(2))
      }
      else
        (Configuration.dataCondition, Configuration.dataType)

      sqlContext.sql(
        s"""SELECT  statmement ... Condition = '$Condition'""".stripMargin)
    } else {
      df_init
    }).*repartition*(Configuration.appPartitioning)

df.persist()

 We would expect that df should be spread across the nodes but that is not
the case and all data are still located on one server. I am not sure if
that is somehow related to temTable usage or ... I was convinced that
repartition will cause reshuffle accross the nodes but that is not true in
this case. We are using number of partition 96 as we have 6 machines  with
8 cores.

Anybody see any issue here?

Thanks
Jakub


On 14 July 2016 at 23:07, Mich Talebzadeh <mi...@gmail.com> wrote:

> Thanks for details
>
> You mentioned
>
> "I have a spark  cluster running in a single mode, master + 6 executors."
>
> Do you mean running in a single NODE?
>
> JDBC read
>
> This one reads from Oracle table
>
> val c = HiveContext.load("jdbc",
> Map("url" -> _ORACLEserver,
> "dbtable" -> "(SELECT to_char(CHANNEL_ID) AS CHANNEL_ID, CHANNEL_DESC FROM
> sh.channels)",
> "user" -> _username_ORACLE,
> "password" -> _password_ORACLE))
>
> c.registerTempTable("t_c")
>
> Then basically you run some SQL on the temp table df_ini. BTW personally
> to avoid ambiguity I would call temp table different from its df something
> like t_fd_init (just a suggestion)
>
> If you are running a single node then what is the evidence that all those
> executors are needed. Have you tried caching the temp table before running
> SQL?
>
> //Cache the table
> HiveContext.cacheTable("df_ini")
>
> If you are running sql on temp table then you are just creating multiple
> smaller result sets and logically depending on the size of the result set,
> there may not be a need for much parallel processing. A serial scan may be
> sufficient.
>
> Although with my tables cached I see all workers are doing something
>
> [image: Inline images 2]
>
> So really difficult to answer
>
> HTH
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 14 July 2016 at 18:31, Jakub Stransky <st...@gmail.com> wrote:
>
>> HI Talebzadeh,
>>
>> sorry I forget to answer last part of your question:
>>
>> At O/S level you should see many CoarseGrainedExecutorBackend through jps
>> each corresponding to one executor. Are they doing anything?
>>
>> There is one worker with one executor bussy and the rest is almost idle:
>>
>>   PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+
>> COMMAND
>>  9305 spark     20   0 30.489g 5.075g  22256 S  * 0.3 18.5*   0:36.25
>> java
>>
>> The only one - bussy one is running all 8cores machine
>>
>>   PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+
>> COMMAND
>>  9580 zdata     20   0 29.664g 0.021t   6836 S* 676.7 79.4*  40:08.61
>> java
>>
>>
>> Thanks
>> Jakub
>>
>> On 14 July 2016 at 19:22, Jakub Stransky <st...@gmail.com> wrote:
>>
>>> HI Talebzadeh,
>>>
>>> we are using 6 worker machines - running.
>>>
>>> We are reading the data through sqlContext (data frame) as it is
>>> suggested in the documentation over the JdbcRdd
>>>
>>> prop just specifies name, password, and driver class.
>>>
>>> Right after this data load we register it as a temp table
>>>
>>>     val df_init = sqlContext.read
>>>       .jdbc(
>>>         url = Configuration.dbUrl,
>>>         table = Configuration.dbTable,
>>>         prop
>>>       )
>>>
>>>     df_init.registerTempTable("df_init")
>>>
>>> Afterwords we do some data filtering, column selection and filtering
>>> some rows with sqlContext.sql ("select statement here")
>>>
>>> and after this selection we try to repartition the data in order to get
>>> them distributed across the cluster and that seems it is not working. And
>>> then we persist that filtered and selected dataFrame.
>>>
>>> And the desired state should be filtered dataframe should be distributed
>>> accross the nodes in the cluster.
>>>
>>> Jakub
>>>
>>>
>>>
>>> On 14 July 2016 at 19:03, Mich Talebzadeh <mi...@gmail.com>
>>> wrote:
>>>
>>>> Hi Jakub,
>>>>
>>>> Sounds like one executor. Can you point out:
>>>>
>>>>
>>>>    1. The number of slaves/workers you are running
>>>>    2. Are you using JDBC to read data in?
>>>>    3. Do you register DF as temp table and if so have you cached temp
>>>>    table
>>>>
>>>> Sounds like only one executor is active and the rest are sitting idele.
>>>>
>>>> At O/S level you should see many CoarseGrainedExecutorBackend through
>>>> jps each corresponding to one executor. Are they doing anything?
>>>>
>>>> HTH
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>> On 14 July 2016 at 17:18, Jakub Stransky <st...@gmail.com> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I have a spark  cluster running in a single mode, master + 6 executors.
>>>>>
>>>>> My application is reading a data from database via DataFrame.read then
>>>>> there is a filtering of rows. After that I re-partition data and I wonder
>>>>> why on the executors page of the driver UI I see RDD blocks all allocated
>>>>> still on single executor machine
>>>>>
>>>>> [image: Inline images 1]
>>>>> As highlighted on the picture above. I did expect that after
>>>>> re-partition the data will be shuffled across cluster but that is obviously
>>>>> not happening here.
>>>>>
>>>>> I can understand that database read is happening in non-parallel
>>>>> fashion but re-partition  should fix it as far as I understand.
>>>>>
>>>>> Could someone experienced clarify that?
>>>>>
>>>>> Thanks
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Jakub Stransky
>>> cz.linkedin.com/in/jakubstransky
>>>
>>>
>>
>>
>> --
>> Jakub Stransky
>> cz.linkedin.com/in/jakubstransky
>>
>>
>


-- 
Jakub Stransky
cz.linkedin.com/in/jakubstransky

Re: Standalone cluster node utilization

Posted by Mich Talebzadeh <mi...@gmail.com>.
Thanks for details

You mentioned

"I have a spark  cluster running in a single mode, master + 6 executors."

Do you mean running in a single NODE?

JDBC read

This one reads from Oracle table

val c = HiveContext.load("jdbc",
Map("url" -> _ORACLEserver,
"dbtable" -> "(SELECT to_char(CHANNEL_ID) AS CHANNEL_ID, CHANNEL_DESC FROM
sh.channels)",
"user" -> _username_ORACLE,
"password" -> _password_ORACLE))

c.registerTempTable("t_c")

Then basically you run some SQL on the temp table df_ini. BTW personally to
avoid ambiguity I would call temp table different from its df something
like t_fd_init (just a suggestion)

If you are running a single node then what is the evidence that all those
executors are needed. Have you tried caching the temp table before running
SQL?

//Cache the table
HiveContext.cacheTable("df_ini")

If you are running sql on temp table then you are just creating multiple
smaller result sets and logically depending on the size of the result set,
there may not be a need for much parallel processing. A serial scan may be
sufficient.

Although with my tables cached I see all workers are doing something

[image: Inline images 2]

So really difficult to answer

HTH



Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 14 July 2016 at 18:31, Jakub Stransky <st...@gmail.com> wrote:

> HI Talebzadeh,
>
> sorry I forget to answer last part of your question:
>
> At O/S level you should see many CoarseGrainedExecutorBackend through jps
> each corresponding to one executor. Are they doing anything?
>
> There is one worker with one executor bussy and the rest is almost idle:
>
>   PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND
>  9305 spark     20   0 30.489g 5.075g  22256 S  * 0.3 18.5*   0:36.25 java
>
> The only one - bussy one is running all 8cores machine
>
>   PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND
>  9580 zdata     20   0 29.664g 0.021t   6836 S* 676.7 79.4*  40:08.61 java
>
>
> Thanks
> Jakub
>
> On 14 July 2016 at 19:22, Jakub Stransky <st...@gmail.com> wrote:
>
>> HI Talebzadeh,
>>
>> we are using 6 worker machines - running.
>>
>> We are reading the data through sqlContext (data frame) as it is
>> suggested in the documentation over the JdbcRdd
>>
>> prop just specifies name, password, and driver class.
>>
>> Right after this data load we register it as a temp table
>>
>>     val df_init = sqlContext.read
>>       .jdbc(
>>         url = Configuration.dbUrl,
>>         table = Configuration.dbTable,
>>         prop
>>       )
>>
>>     df_init.registerTempTable("df_init")
>>
>> Afterwords we do some data filtering, column selection and filtering some
>> rows with sqlContext.sql ("select statement here")
>>
>> and after this selection we try to repartition the data in order to get
>> them distributed across the cluster and that seems it is not working. And
>> then we persist that filtered and selected dataFrame.
>>
>> And the desired state should be filtered dataframe should be distributed
>> accross the nodes in the cluster.
>>
>> Jakub
>>
>>
>>
>> On 14 July 2016 at 19:03, Mich Talebzadeh <mi...@gmail.com>
>> wrote:
>>
>>> Hi Jakub,
>>>
>>> Sounds like one executor. Can you point out:
>>>
>>>
>>>    1. The number of slaves/workers you are running
>>>    2. Are you using JDBC to read data in?
>>>    3. Do you register DF as temp table and if so have you cached temp
>>>    table
>>>
>>> Sounds like only one executor is active and the rest are sitting idele.
>>>
>>> At O/S level you should see many CoarseGrainedExecutorBackend through
>>> jps each corresponding to one executor. Are they doing anything?
>>>
>>> HTH
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> On 14 July 2016 at 17:18, Jakub Stransky <st...@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> I have a spark  cluster running in a single mode, master + 6 executors.
>>>>
>>>> My application is reading a data from database via DataFrame.read then
>>>> there is a filtering of rows. After that I re-partition data and I wonder
>>>> why on the executors page of the driver UI I see RDD blocks all allocated
>>>> still on single executor machine
>>>>
>>>> [image: Inline images 1]
>>>> As highlighted on the picture above. I did expect that after
>>>> re-partition the data will be shuffled across cluster but that is obviously
>>>> not happening here.
>>>>
>>>> I can understand that database read is happening in non-parallel
>>>> fashion but re-partition  should fix it as far as I understand.
>>>>
>>>> Could someone experienced clarify that?
>>>>
>>>> Thanks
>>>>
>>>
>>>
>>
>>
>> --
>> Jakub Stransky
>> cz.linkedin.com/in/jakubstransky
>>
>>
>
>
> --
> Jakub Stransky
> cz.linkedin.com/in/jakubstransky
>
>

Re: Standalone cluster node utilization

Posted by Jakub Stransky <st...@gmail.com>.
HI Talebzadeh,

sorry I forget to answer last part of your question:

At O/S level you should see many CoarseGrainedExecutorBackend through jps
each corresponding to one executor. Are they doing anything?

There is one worker with one executor bussy and the rest is almost idle:

  PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND
 9305 spark     20   0 30.489g 5.075g  22256 S  * 0.3 18.5*   0:36.25 java

The only one - bussy one is running all 8cores machine

  PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND
 9580 zdata     20   0 29.664g 0.021t   6836 S* 676.7 79.4*  40:08.61 java


Thanks
Jakub

On 14 July 2016 at 19:22, Jakub Stransky <st...@gmail.com> wrote:

> HI Talebzadeh,
>
> we are using 6 worker machines - running.
>
> We are reading the data through sqlContext (data frame) as it is suggested
> in the documentation over the JdbcRdd
>
> prop just specifies name, password, and driver class.
>
> Right after this data load we register it as a temp table
>
>     val df_init = sqlContext.read
>       .jdbc(
>         url = Configuration.dbUrl,
>         table = Configuration.dbTable,
>         prop
>       )
>
>     df_init.registerTempTable("df_init")
>
> Afterwords we do some data filtering, column selection and filtering some
> rows with sqlContext.sql ("select statement here")
>
> and after this selection we try to repartition the data in order to get
> them distributed across the cluster and that seems it is not working. And
> then we persist that filtered and selected dataFrame.
>
> And the desired state should be filtered dataframe should be distributed
> accross the nodes in the cluster.
>
> Jakub
>
>
>
> On 14 July 2016 at 19:03, Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
>> Hi Jakub,
>>
>> Sounds like one executor. Can you point out:
>>
>>
>>    1. The number of slaves/workers you are running
>>    2. Are you using JDBC to read data in?
>>    3. Do you register DF as temp table and if so have you cached temp
>>    table
>>
>> Sounds like only one executor is active and the rest are sitting idele.
>>
>> At O/S level you should see many CoarseGrainedExecutorBackend through jps
>> each corresponding to one executor. Are they doing anything?
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 14 July 2016 at 17:18, Jakub Stransky <st...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I have a spark  cluster running in a single mode, master + 6 executors.
>>>
>>> My application is reading a data from database via DataFrame.read then
>>> there is a filtering of rows. After that I re-partition data and I wonder
>>> why on the executors page of the driver UI I see RDD blocks all allocated
>>> still on single executor machine
>>>
>>> [image: Inline images 1]
>>> As highlighted on the picture above. I did expect that after
>>> re-partition the data will be shuffled across cluster but that is obviously
>>> not happening here.
>>>
>>> I can understand that database read is happening in non-parallel fashion
>>> but re-partition  should fix it as far as I understand.
>>>
>>> Could someone experienced clarify that?
>>>
>>> Thanks
>>>
>>
>>
>
>
> --
> Jakub Stransky
> cz.linkedin.com/in/jakubstransky
>
>


-- 
Jakub Stransky
cz.linkedin.com/in/jakubstransky

Re: Standalone cluster node utilization

Posted by Jakub Stransky <st...@gmail.com>.
HI Talebzadeh,

we are using 6 worker machines - running.

We are reading the data through sqlContext (data frame) as it is suggested
in the documentation over the JdbcRdd

prop just specifies name, password, and driver class.

Right after this data load we register it as a temp table

    val df_init = sqlContext.read
      .jdbc(
        url = Configuration.dbUrl,
        table = Configuration.dbTable,
        prop
      )

    df_init.registerTempTable("df_init")

Afterwords we do some data filtering, column selection and filtering some
rows with sqlContext.sql ("select statement here")

and after this selection we try to repartition the data in order to get
them distributed across the cluster and that seems it is not working. And
then we persist that filtered and selected dataFrame.

And the desired state should be filtered dataframe should be distributed
accross the nodes in the cluster.

Jakub



On 14 July 2016 at 19:03, Mich Talebzadeh <mi...@gmail.com> wrote:

> Hi Jakub,
>
> Sounds like one executor. Can you point out:
>
>
>    1. The number of slaves/workers you are running
>    2. Are you using JDBC to read data in?
>    3. Do you register DF as temp table and if so have you cached temp
>    table
>
> Sounds like only one executor is active and the rest are sitting idele.
>
> At O/S level you should see many CoarseGrainedExecutorBackend through jps
> each corresponding to one executor. Are they doing anything?
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 14 July 2016 at 17:18, Jakub Stransky <st...@gmail.com> wrote:
>
>> Hello,
>>
>> I have a spark  cluster running in a single mode, master + 6 executors.
>>
>> My application is reading a data from database via DataFrame.read then
>> there is a filtering of rows. After that I re-partition data and I wonder
>> why on the executors page of the driver UI I see RDD blocks all allocated
>> still on single executor machine
>>
>> [image: Inline images 1]
>> As highlighted on the picture above. I did expect that after re-partition
>> the data will be shuffled across cluster but that is obviously not
>> happening here.
>>
>> I can understand that database read is happening in non-parallel fashion
>> but re-partition  should fix it as far as I understand.
>>
>> Could someone experienced clarify that?
>>
>> Thanks
>>
>
>


-- 
Jakub Stransky
cz.linkedin.com/in/jakubstransky

Re: Standalone cluster node utilization

Posted by Mich Talebzadeh <mi...@gmail.com>.
Hi Jakub,

Sounds like one executor. Can you point out:


   1. The number of slaves/workers you are running
   2. Are you using JDBC to read data in?
   3. Do you register DF as temp table and if so have you cached temp table

Sounds like only one executor is active and the rest are sitting idele.

At O/S level you should see many CoarseGrainedExecutorBackend through jps
each corresponding to one executor. Are they doing anything?

HTH

Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 14 July 2016 at 17:18, Jakub Stransky <st...@gmail.com> wrote:

> Hello,
>
> I have a spark  cluster running in a single mode, master + 6 executors.
>
> My application is reading a data from database via DataFrame.read then
> there is a filtering of rows. After that I re-partition data and I wonder
> why on the executors page of the driver UI I see RDD blocks all allocated
> still on single executor machine
>
> [image: Inline images 1]
> As highlighted on the picture above. I did expect that after re-partition
> the data will be shuffled across cluster but that is obviously not
> happening here.
>
> I can understand that database read is happening in non-parallel fashion
> but re-partition  should fix it as far as I understand.
>
> Could someone experienced clarify that?
>
> Thanks
>