You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Ajay Chander <it...@gmail.com> on 2016/10/04 23:44:22 UTC

UseCase_Design_Help

Hi Everyone,

I have a use-case where I have two Dataframes like below,

1) First Dataframe(DF1) contains,

*    ANIMALS    *
Mammals
Birds
Fish
Reptiles
Amphibians

2) Second Dataframe(DF2) contains,

*    ID, Mammals, Birds, Fish, Reptiles, Amphibians    *
1,      Dogs,      Eagle,      Goldfish,      NULL,      Frog
2,      Cats,      Peacock,      Guppy,     Turtle,      Salamander
3,      Dolphins,      Eagle,      Zander,      NULL,      Frog
4,      Whales,      Parrot,      Guppy,      Snake,      Frog
5,      Horses,      Owl,      Guppy,      Snake,      Frog
6,      Dolphins,      Kingfisher,      Zander,      Turtle,      Frog
7,      Dogs,      Sparrow,      Goldfish,      NULL,      Salamander

Now I want to take each row from DF1 and find out its distinct count in
DF2. Example, pick Mammals from DF1 then find out count(distinct(Mammals))
from DF2 i.e. 5

DF1 has 70 distinct rows/Animal types
DF2 has some million rows

Whats the best way to achieve this efficiently using parallelism ?

Any inputs are helpful. Thank you.

Regards,
Ajay

Re: UseCase_Design_Help

Posted by Ajay Chander <it...@gmail.com>.
+ user@spark.apache.org

Hi Daniel, I will try this one out and let you know. Thank you.

On Wed, Oct 5, 2016 at 9:50 AM, Daniel Siegmann <
dsiegmann@securityscorecard.io> wrote:

> I think it's fine to read animal types locally because there are only 70
> of them. It's just that you want to execute the Spark actions in parallel.
> The easiest way to do that is to have only a single action.
>
> Instead of grabbing the result right away, I would just add a column for
> the animal type and union the datasets for the animal types. Something like
> this (not sure if the syntax is correct):
>
> val animalCounts: DataFrame = animalTypes.map { anmtyp =>
>     sqlContext.sql("select lit("+anmtyp+") as animal_type,
> count(distinct("+anmtyp+")) from TEST1 ")
> }.reduce(_.union(_))
>
> animalCounts.foreach( /* print the output */ )
>
> On Wed, Oct 5, 2016 at 12:42 AM, Daniel <da...@gmail.com> wrote:
>
>> First of all, if you want to read a txt file in Spark, you should use
>> sc.textFile, because you are using "Source.fromFile", so you are reading it
>> with Scala standard api, so it will be read sequentially.
>>
>> Furthermore you are going to need create a schema if you want to use
>> dataframes.
>>
>> El 5/10/2016 1:53, "Ajay Chander" <it...@gmail.com> escribió:
>>
>>> Right now, I am doing it like below,
>>>
>>> import scala.io.Source
>>>
>>> val animalsFile = "/home/ajay/dataset/animal_types.txt"
>>> val animalTypes = Source.fromFile(animalsFile).getLines.toArray
>>>
>>> for ( anmtyp <- animalTypes ) {
>>>       val distinctAnmTypCount = sqlContext.sql("select
>>> count(distinct("+anmtyp+")) from TEST1 ")
>>>       println("Calculating Metrics for Animal Type: "+anmtyp)
>>>       if( distinctAnmTypCount.head().getAs[Long](0) <= 10 ){
>>>         println("Animal Type: "+anmtyp+" has <= 10 distinct values")
>>>       } else {
>>>         println("Animal Type: "+anmtyp+" has > 10 distinct values")
>>>       }
>>>     }
>>>
>>> But the problem is it is running sequentially.
>>>
>>> Any inputs are appreciated. Thank you.
>>>
>>>
>>> Regards,
>>> Ajay
>>>
>>>
>>> On Tue, Oct 4, 2016 at 7:44 PM, Ajay Chander <it...@gmail.com>
>>> wrote:
>>>
>>>> Hi Everyone,
>>>>
>>>> I have a use-case where I have two Dataframes like below,
>>>>
>>>> 1) First Dataframe(DF1) contains,
>>>>
>>>> *    ANIMALS    *
>>>> Mammals
>>>> Birds
>>>> Fish
>>>> Reptiles
>>>> Amphibians
>>>>
>>>> 2) Second Dataframe(DF2) contains,
>>>>
>>>> *    ID, Mammals, Birds, Fish, Reptiles, Amphibians    *
>>>> 1,      Dogs,      Eagle,      Goldfish,      NULL,      Frog
>>>> 2,      Cats,      Peacock,      Guppy,     Turtle,      Salamander
>>>> 3,      Dolphins,      Eagle,      Zander,      NULL,      Frog
>>>> 4,      Whales,      Parrot,      Guppy,      Snake,      Frog
>>>> 5,      Horses,      Owl,      Guppy,      Snake,      Frog
>>>> 6,      Dolphins,      Kingfisher,      Zander,      Turtle,      Frog
>>>> 7,      Dogs,      Sparrow,      Goldfish,      NULL,      Salamander
>>>>
>>>> Now I want to take each row from DF1 and find out its distinct count in
>>>> DF2. Example, pick Mammals from DF1 then find out count(distinct(Mammals))
>>>> from DF2 i.e. 5
>>>>
>>>> DF1 has 70 distinct rows/Animal types
>>>> DF2 has some million rows
>>>>
>>>> Whats the best way to achieve this efficiently using parallelism ?
>>>>
>>>> Any inputs are helpful. Thank you.
>>>>
>>>> Regards,
>>>> Ajay
>>>>
>>>>
>>>
>

UseCase_Design_Help

Posted by Ajay Chander <it...@gmail.com>.
Ayan, Thanks for the help. In my scenario, currently, I have business rule
i.e. Animal Types in a file(later in a hive table). I want to go after only
those elements from the list. Once I identify the distinct counts, I have
to write two different functionalities if count(distinct(element))<=10
and count(distinct(element)) > 10 respectively.

Thanks,
Ajay



On Wed, Oct 5, 2016 at 11:12 AM, ayan guha <gu...@gmail.com> wrote:

> Hi
>
> You can "generate" a sql through program. Python Example:
>
> >>> schema
> ['id', 'Mammals', 'Birds', 'Fish', 'Reptiles', 'Amphibians']
> >>>
>
> >>> count_stmt=[ "count(distinct <tag>) as <tag>".replace("<tag>",x) for x
> in schema]
> >>> count_stmt
> ['count(distinct id) as id', 'count(distinct Mammals) as Mammals',
> 'count(distinct Birds) as Birds', 'count(distinct Fish) as Fish',
> 'count(distinct Reptiles) as Reptiles', 'count(distinct Amphibians) as
> Amphibians']
>
> >>> sql = "select "
> >>> sql = sql +  ", ".join(count_stmt) + " from t "
> >>> sql
> 'select count(distinct id) as id, count(distinct Mammals) as Mammals,
> count(distinct Birds) as Birds, count(distinct Fish) as Fish,
> count(distinct Reptiles) as Reptiles, count(distinct Amphibians) as
> Amphibians from t '
> >>>
>
> >>> df.show()
> +---+--------+----------+--------+--------+----------+
> | id| Mammals|     Birds|    Fish|Reptiles|Amphibians|
> +---+--------+----------+--------+--------+----------+
> |  1|    Dogs|     Eagle|Goldfish|    NULL|      Frog|
> |  2|    Cats|   Peacock|   Guppy|  Turtle|Salamander|
> |  3|Dolphins|     Eagle|  Zander|    NULL|      Frog|
> |  4|  Whales|    Parrot|   Guppy|   Snake|      Frog|
> |  5|  Horses|       Owl|   Guppy|   Snake|      Frog|
> |  6|Dolphins|Kingfisher|  Zander|  Turtle|      Frog|
> |  7|    Dogs|   Sparrow|Goldfish|    NULL|Salamander|
> +---+--------+----------+--------+--------+----------+
>
> >>> cnr = sqlContext.sql(sql)
> >>> cnr.show()
> +---+-------+-----+----+--------+----------+
> | id|Mammals|Birds|Fish|Reptiles|Amphibians|
> +---+-------+-----+----+--------+----------+
> |  7|      5|    6|   3|       3|         2|
> +---+-------+-----+----+--------+----------+
>
> Does that help?
>
> On Thu, Oct 6, 2016 at 1:44 AM, Ajay Chander <it...@gmail.com> wrote:
>
>> Hi Ayan,
>>
>> My Schema for DF2 is fixed but it has around 420 columns (70 Animal type
>> columns and 350 other columns).
>>
>> Thanks,
>> Ajay
>>
>> On Wed, Oct 5, 2016 at 10:37 AM, ayan guha <gu...@gmail.com> wrote:
>>
>>> Is your schema for df2 is fixed? ie do you have 70 category columns?
>>>
>>> On Thu, Oct 6, 2016 at 12:50 AM, Daniel Siegmann <
>>> dsiegmann@securityscorecard.io> wrote:
>>>
>>>> I think it's fine to read animal types locally because there are only
>>>> 70 of them. It's just that you want to execute the Spark actions in
>>>> parallel. The easiest way to do that is to have only a single action.
>>>>
>>>> Instead of grabbing the result right away, I would just add a column
>>>> for the animal type and union the datasets for the animal types. Something
>>>> like this (not sure if the syntax is correct):
>>>>
>>>> val animalCounts: DataFrame = animalTypes.map { anmtyp =>
>>>>     sqlContext.sql("select lit("+anmtyp+") as animal_type,
>>>> count(distinct("+anmtyp+")) from TEST1 ")
>>>> }.reduce(_.union(_))
>>>>
>>>> animalCounts.foreach( /* print the output */ )
>>>>
>>>> On Wed, Oct 5, 2016 at 12:42 AM, Daniel <da...@gmail.com> wrote:
>>>>
>>>>> First of all, if you want to read a txt file in Spark, you should use
>>>>> sc.textFile, because you are using "Source.fromFile", so you are reading it
>>>>> with Scala standard api, so it will be read sequentially.
>>>>>
>>>>> Furthermore you are going to need create a schema if you want to use
>>>>> dataframes.
>>>>>
>>>>> El 5/10/2016 1:53, "Ajay Chander" <it...@gmail.com> escribió:
>>>>>
>>>>>> Right now, I am doing it like below,
>>>>>>
>>>>>> import scala.io.Source
>>>>>>
>>>>>> val animalsFile = "/home/ajay/dataset/animal_types.txt"
>>>>>> val animalTypes = Source.fromFile(animalsFile).getLines.toArray
>>>>>>
>>>>>> for ( anmtyp <- animalTypes ) {
>>>>>>       val distinctAnmTypCount = sqlContext.sql("select
>>>>>> count(distinct("+anmtyp+")) from TEST1 ")
>>>>>>       println("Calculating Metrics for Animal Type: "+anmtyp)
>>>>>>       if( distinctAnmTypCount.head().getAs[Long](0) <= 10 ){
>>>>>>         println("Animal Type: "+anmtyp+" has <= 10 distinct values")
>>>>>>       } else {
>>>>>>         println("Animal Type: "+anmtyp+" has > 10 distinct values")
>>>>>>       }
>>>>>>     }
>>>>>>
>>>>>> But the problem is it is running sequentially.
>>>>>>
>>>>>> Any inputs are appreciated. Thank you.
>>>>>>
>>>>>>
>>>>>> Regards,
>>>>>> Ajay
>>>>>>
>>>>>>
>>>>>> On Tue, Oct 4, 2016 at 7:44 PM, Ajay Chander <it...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Everyone,
>>>>>>>
>>>>>>> I have a use-case where I have two Dataframes like below,
>>>>>>>
>>>>>>> 1) First Dataframe(DF1) contains,
>>>>>>>
>>>>>>> *    ANIMALS    *
>>>>>>> Mammals
>>>>>>> Birds
>>>>>>> Fish
>>>>>>> Reptiles
>>>>>>> Amphibians
>>>>>>>
>>>>>>> 2) Second Dataframe(DF2) contains,
>>>>>>>
>>>>>>> *    ID, Mammals, Birds, Fish, Reptiles, Amphibians    *
>>>>>>> 1,      Dogs,      Eagle,      Goldfish,      NULL,      Frog
>>>>>>> 2,      Cats,      Peacock,      Guppy,     Turtle,      Salamander
>>>>>>> 3,      Dolphins,      Eagle,      Zander,      NULL,      Frog
>>>>>>> 4,      Whales,      Parrot,      Guppy,      Snake,      Frog
>>>>>>> 5,      Horses,      Owl,      Guppy,      Snake,      Frog
>>>>>>> 6,      Dolphins,      Kingfisher,      Zander,      Turtle,
>>>>>>>   Frog
>>>>>>> 7,      Dogs,      Sparrow,      Goldfish,      NULL,      Salamander
>>>>>>>
>>>>>>> Now I want to take each row from DF1 and find out its distinct count
>>>>>>> in DF2. Example, pick Mammals from DF1 then find out
>>>>>>> count(distinct(Mammals)) from DF2 i.e. 5
>>>>>>>
>>>>>>> DF1 has 70 distinct rows/Animal types
>>>>>>> DF2 has some million rows
>>>>>>>
>>>>>>> Whats the best way to achieve this efficiently using parallelism ?
>>>>>>>
>>>>>>> Any inputs are helpful. Thank you.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Ajay
>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>>
>>>
>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>

Re: UseCase_Design_Help

Posted by ayan guha <gu...@gmail.com>.
Hi

You can "generate" a sql through program. Python Example:

>>> schema
['id', 'Mammals', 'Birds', 'Fish', 'Reptiles', 'Amphibians']
>>>

>>> count_stmt=[ "count(distinct <tag>) as <tag>".replace("<tag>",x) for x
in schema]
>>> count_stmt
['count(distinct id) as id', 'count(distinct Mammals) as Mammals',
'count(distinct Birds) as Birds', 'count(distinct Fish) as Fish',
'count(distinct Reptiles) as Reptiles', 'count(distinct Amphibians) as
Amphibians']

>>> sql = "select "
>>> sql = sql +  ", ".join(count_stmt) + " from t "
>>> sql
'select count(distinct id) as id, count(distinct Mammals) as Mammals,
count(distinct Birds) as Birds, count(distinct Fish) as Fish,
count(distinct Reptiles) as Reptiles, count(distinct Amphibians) as
Amphibians from t '
>>>

>>> df.show()
+---+--------+----------+--------+--------+----------+
| id| Mammals|     Birds|    Fish|Reptiles|Amphibians|
+---+--------+----------+--------+--------+----------+
|  1|    Dogs|     Eagle|Goldfish|    NULL|      Frog|
|  2|    Cats|   Peacock|   Guppy|  Turtle|Salamander|
|  3|Dolphins|     Eagle|  Zander|    NULL|      Frog|
|  4|  Whales|    Parrot|   Guppy|   Snake|      Frog|
|  5|  Horses|       Owl|   Guppy|   Snake|      Frog|
|  6|Dolphins|Kingfisher|  Zander|  Turtle|      Frog|
|  7|    Dogs|   Sparrow|Goldfish|    NULL|Salamander|
+---+--------+----------+--------+--------+----------+

>>> cnr = sqlContext.sql(sql)
>>> cnr.show()
+---+-------+-----+----+--------+----------+
| id|Mammals|Birds|Fish|Reptiles|Amphibians|
+---+-------+-----+----+--------+----------+
|  7|      5|    6|   3|       3|         2|
+---+-------+-----+----+--------+----------+

Does that help?

On Thu, Oct 6, 2016 at 1:44 AM, Ajay Chander <it...@gmail.com> wrote:

> Hi Ayan,
>
> My Schema for DF2 is fixed but it has around 420 columns (70 Animal type
> columns and 350 other columns).
>
> Thanks,
> Ajay
>
> On Wed, Oct 5, 2016 at 10:37 AM, ayan guha <gu...@gmail.com> wrote:
>
>> Is your schema for df2 is fixed? ie do you have 70 category columns?
>>
>> On Thu, Oct 6, 2016 at 12:50 AM, Daniel Siegmann <
>> dsiegmann@securityscorecard.io> wrote:
>>
>>> I think it's fine to read animal types locally because there are only 70
>>> of them. It's just that you want to execute the Spark actions in parallel.
>>> The easiest way to do that is to have only a single action.
>>>
>>> Instead of grabbing the result right away, I would just add a column for
>>> the animal type and union the datasets for the animal types. Something like
>>> this (not sure if the syntax is correct):
>>>
>>> val animalCounts: DataFrame = animalTypes.map { anmtyp =>
>>>     sqlContext.sql("select lit("+anmtyp+") as animal_type,
>>> count(distinct("+anmtyp+")) from TEST1 ")
>>> }.reduce(_.union(_))
>>>
>>> animalCounts.foreach( /* print the output */ )
>>>
>>> On Wed, Oct 5, 2016 at 12:42 AM, Daniel <da...@gmail.com> wrote:
>>>
>>>> First of all, if you want to read a txt file in Spark, you should use
>>>> sc.textFile, because you are using "Source.fromFile", so you are reading it
>>>> with Scala standard api, so it will be read sequentially.
>>>>
>>>> Furthermore you are going to need create a schema if you want to use
>>>> dataframes.
>>>>
>>>> El 5/10/2016 1:53, "Ajay Chander" <it...@gmail.com> escribió:
>>>>
>>>>> Right now, I am doing it like below,
>>>>>
>>>>> import scala.io.Source
>>>>>
>>>>> val animalsFile = "/home/ajay/dataset/animal_types.txt"
>>>>> val animalTypes = Source.fromFile(animalsFile).getLines.toArray
>>>>>
>>>>> for ( anmtyp <- animalTypes ) {
>>>>>       val distinctAnmTypCount = sqlContext.sql("select
>>>>> count(distinct("+anmtyp+")) from TEST1 ")
>>>>>       println("Calculating Metrics for Animal Type: "+anmtyp)
>>>>>       if( distinctAnmTypCount.head().getAs[Long](0) <= 10 ){
>>>>>         println("Animal Type: "+anmtyp+" has <= 10 distinct values")
>>>>>       } else {
>>>>>         println("Animal Type: "+anmtyp+" has > 10 distinct values")
>>>>>       }
>>>>>     }
>>>>>
>>>>> But the problem is it is running sequentially.
>>>>>
>>>>> Any inputs are appreciated. Thank you.
>>>>>
>>>>>
>>>>> Regards,
>>>>> Ajay
>>>>>
>>>>>
>>>>> On Tue, Oct 4, 2016 at 7:44 PM, Ajay Chander <it...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Everyone,
>>>>>>
>>>>>> I have a use-case where I have two Dataframes like below,
>>>>>>
>>>>>> 1) First Dataframe(DF1) contains,
>>>>>>
>>>>>> *    ANIMALS    *
>>>>>> Mammals
>>>>>> Birds
>>>>>> Fish
>>>>>> Reptiles
>>>>>> Amphibians
>>>>>>
>>>>>> 2) Second Dataframe(DF2) contains,
>>>>>>
>>>>>> *    ID, Mammals, Birds, Fish, Reptiles, Amphibians    *
>>>>>> 1,      Dogs,      Eagle,      Goldfish,      NULL,      Frog
>>>>>> 2,      Cats,      Peacock,      Guppy,     Turtle,      Salamander
>>>>>> 3,      Dolphins,      Eagle,      Zander,      NULL,      Frog
>>>>>> 4,      Whales,      Parrot,      Guppy,      Snake,      Frog
>>>>>> 5,      Horses,      Owl,      Guppy,      Snake,      Frog
>>>>>> 6,      Dolphins,      Kingfisher,      Zander,      Turtle,      Frog
>>>>>> 7,      Dogs,      Sparrow,      Goldfish,      NULL,      Salamander
>>>>>>
>>>>>> Now I want to take each row from DF1 and find out its distinct count
>>>>>> in DF2. Example, pick Mammals from DF1 then find out
>>>>>> count(distinct(Mammals)) from DF2 i.e. 5
>>>>>>
>>>>>> DF1 has 70 distinct rows/Animal types
>>>>>> DF2 has some million rows
>>>>>>
>>>>>> Whats the best way to achieve this efficiently using parallelism ?
>>>>>>
>>>>>> Any inputs are helpful. Thank you.
>>>>>>
>>>>>> Regards,
>>>>>> Ajay
>>>>>>
>>>>>>
>>>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


-- 
Best Regards,
Ayan Guha

Re: UseCase_Design_Help

Posted by Ajay Chander <it...@gmail.com>.
Hi Ayan,

My Schema for DF2 is fixed but it has around 420 columns (70 Animal type
columns and 350 other columns).

Thanks,
Ajay

On Wed, Oct 5, 2016 at 10:37 AM, ayan guha <gu...@gmail.com> wrote:

> Is your schema for df2 is fixed? ie do you have 70 category columns?
>
> On Thu, Oct 6, 2016 at 12:50 AM, Daniel Siegmann <
> dsiegmann@securityscorecard.io> wrote:
>
>> I think it's fine to read animal types locally because there are only 70
>> of them. It's just that you want to execute the Spark actions in parallel.
>> The easiest way to do that is to have only a single action.
>>
>> Instead of grabbing the result right away, I would just add a column for
>> the animal type and union the datasets for the animal types. Something like
>> this (not sure if the syntax is correct):
>>
>> val animalCounts: DataFrame = animalTypes.map { anmtyp =>
>>     sqlContext.sql("select lit("+anmtyp+") as animal_type,
>> count(distinct("+anmtyp+")) from TEST1 ")
>> }.reduce(_.union(_))
>>
>> animalCounts.foreach( /* print the output */ )
>>
>> On Wed, Oct 5, 2016 at 12:42 AM, Daniel <da...@gmail.com> wrote:
>>
>>> First of all, if you want to read a txt file in Spark, you should use
>>> sc.textFile, because you are using "Source.fromFile", so you are reading it
>>> with Scala standard api, so it will be read sequentially.
>>>
>>> Furthermore you are going to need create a schema if you want to use
>>> dataframes.
>>>
>>> El 5/10/2016 1:53, "Ajay Chander" <it...@gmail.com> escribió:
>>>
>>>> Right now, I am doing it like below,
>>>>
>>>> import scala.io.Source
>>>>
>>>> val animalsFile = "/home/ajay/dataset/animal_types.txt"
>>>> val animalTypes = Source.fromFile(animalsFile).getLines.toArray
>>>>
>>>> for ( anmtyp <- animalTypes ) {
>>>>       val distinctAnmTypCount = sqlContext.sql("select
>>>> count(distinct("+anmtyp+")) from TEST1 ")
>>>>       println("Calculating Metrics for Animal Type: "+anmtyp)
>>>>       if( distinctAnmTypCount.head().getAs[Long](0) <= 10 ){
>>>>         println("Animal Type: "+anmtyp+" has <= 10 distinct values")
>>>>       } else {
>>>>         println("Animal Type: "+anmtyp+" has > 10 distinct values")
>>>>       }
>>>>     }
>>>>
>>>> But the problem is it is running sequentially.
>>>>
>>>> Any inputs are appreciated. Thank you.
>>>>
>>>>
>>>> Regards,
>>>> Ajay
>>>>
>>>>
>>>> On Tue, Oct 4, 2016 at 7:44 PM, Ajay Chander <it...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Everyone,
>>>>>
>>>>> I have a use-case where I have two Dataframes like below,
>>>>>
>>>>> 1) First Dataframe(DF1) contains,
>>>>>
>>>>> *    ANIMALS    *
>>>>> Mammals
>>>>> Birds
>>>>> Fish
>>>>> Reptiles
>>>>> Amphibians
>>>>>
>>>>> 2) Second Dataframe(DF2) contains,
>>>>>
>>>>> *    ID, Mammals, Birds, Fish, Reptiles, Amphibians    *
>>>>> 1,      Dogs,      Eagle,      Goldfish,      NULL,      Frog
>>>>> 2,      Cats,      Peacock,      Guppy,     Turtle,      Salamander
>>>>> 3,      Dolphins,      Eagle,      Zander,      NULL,      Frog
>>>>> 4,      Whales,      Parrot,      Guppy,      Snake,      Frog
>>>>> 5,      Horses,      Owl,      Guppy,      Snake,      Frog
>>>>> 6,      Dolphins,      Kingfisher,      Zander,      Turtle,      Frog
>>>>> 7,      Dogs,      Sparrow,      Goldfish,      NULL,      Salamander
>>>>>
>>>>> Now I want to take each row from DF1 and find out its distinct count
>>>>> in DF2. Example, pick Mammals from DF1 then find out
>>>>> count(distinct(Mammals)) from DF2 i.e. 5
>>>>>
>>>>> DF1 has 70 distinct rows/Animal types
>>>>> DF2 has some million rows
>>>>>
>>>>> Whats the best way to achieve this efficiently using parallelism ?
>>>>>
>>>>> Any inputs are helpful. Thank you.
>>>>>
>>>>> Regards,
>>>>> Ajay
>>>>>
>>>>>
>>>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>

Re: UseCase_Design_Help

Posted by ayan guha <gu...@gmail.com>.
Is your schema for df2 is fixed? ie do you have 70 category columns?

On Thu, Oct 6, 2016 at 12:50 AM, Daniel Siegmann <
dsiegmann@securityscorecard.io> wrote:

> I think it's fine to read animal types locally because there are only 70
> of them. It's just that you want to execute the Spark actions in parallel.
> The easiest way to do that is to have only a single action.
>
> Instead of grabbing the result right away, I would just add a column for
> the animal type and union the datasets for the animal types. Something like
> this (not sure if the syntax is correct):
>
> val animalCounts: DataFrame = animalTypes.map { anmtyp =>
>     sqlContext.sql("select lit("+anmtyp+") as animal_type,
> count(distinct("+anmtyp+")) from TEST1 ")
> }.reduce(_.union(_))
>
> animalCounts.foreach( /* print the output */ )
>
> On Wed, Oct 5, 2016 at 12:42 AM, Daniel <da...@gmail.com> wrote:
>
>> First of all, if you want to read a txt file in Spark, you should use
>> sc.textFile, because you are using "Source.fromFile", so you are reading it
>> with Scala standard api, so it will be read sequentially.
>>
>> Furthermore you are going to need create a schema if you want to use
>> dataframes.
>>
>> El 5/10/2016 1:53, "Ajay Chander" <it...@gmail.com> escribió:
>>
>>> Right now, I am doing it like below,
>>>
>>> import scala.io.Source
>>>
>>> val animalsFile = "/home/ajay/dataset/animal_types.txt"
>>> val animalTypes = Source.fromFile(animalsFile).getLines.toArray
>>>
>>> for ( anmtyp <- animalTypes ) {
>>>       val distinctAnmTypCount = sqlContext.sql("select
>>> count(distinct("+anmtyp+")) from TEST1 ")
>>>       println("Calculating Metrics for Animal Type: "+anmtyp)
>>>       if( distinctAnmTypCount.head().getAs[Long](0) <= 10 ){
>>>         println("Animal Type: "+anmtyp+" has <= 10 distinct values")
>>>       } else {
>>>         println("Animal Type: "+anmtyp+" has > 10 distinct values")
>>>       }
>>>     }
>>>
>>> But the problem is it is running sequentially.
>>>
>>> Any inputs are appreciated. Thank you.
>>>
>>>
>>> Regards,
>>> Ajay
>>>
>>>
>>> On Tue, Oct 4, 2016 at 7:44 PM, Ajay Chander <it...@gmail.com>
>>> wrote:
>>>
>>>> Hi Everyone,
>>>>
>>>> I have a use-case where I have two Dataframes like below,
>>>>
>>>> 1) First Dataframe(DF1) contains,
>>>>
>>>> *    ANIMALS    *
>>>> Mammals
>>>> Birds
>>>> Fish
>>>> Reptiles
>>>> Amphibians
>>>>
>>>> 2) Second Dataframe(DF2) contains,
>>>>
>>>> *    ID, Mammals, Birds, Fish, Reptiles, Amphibians    *
>>>> 1,      Dogs,      Eagle,      Goldfish,      NULL,      Frog
>>>> 2,      Cats,      Peacock,      Guppy,     Turtle,      Salamander
>>>> 3,      Dolphins,      Eagle,      Zander,      NULL,      Frog
>>>> 4,      Whales,      Parrot,      Guppy,      Snake,      Frog
>>>> 5,      Horses,      Owl,      Guppy,      Snake,      Frog
>>>> 6,      Dolphins,      Kingfisher,      Zander,      Turtle,      Frog
>>>> 7,      Dogs,      Sparrow,      Goldfish,      NULL,      Salamander
>>>>
>>>> Now I want to take each row from DF1 and find out its distinct count in
>>>> DF2. Example, pick Mammals from DF1 then find out count(distinct(Mammals))
>>>> from DF2 i.e. 5
>>>>
>>>> DF1 has 70 distinct rows/Animal types
>>>> DF2 has some million rows
>>>>
>>>> Whats the best way to achieve this efficiently using parallelism ?
>>>>
>>>> Any inputs are helpful. Thank you.
>>>>
>>>> Regards,
>>>> Ajay
>>>>
>>>>
>>>
>


-- 
Best Regards,
Ayan Guha

Re: UseCase_Design_Help

Posted by Daniel Siegmann <ds...@securityscorecard.io>.
I think it's fine to read animal types locally because there are only 70 of
them. It's just that you want to execute the Spark actions in parallel. The
easiest way to do that is to have only a single action.

Instead of grabbing the result right away, I would just add a column for
the animal type and union the datasets for the animal types. Something like
this (not sure if the syntax is correct):

val animalCounts: DataFrame = animalTypes.map { anmtyp =>
    sqlContext.sql("select lit("+anmtyp+") as animal_type,
count(distinct("+anmtyp+")) from TEST1 ")
}.reduce(_.union(_))

animalCounts.foreach( /* print the output */ )

On Wed, Oct 5, 2016 at 12:42 AM, Daniel <da...@gmail.com> wrote:

> First of all, if you want to read a txt file in Spark, you should use
> sc.textFile, because you are using "Source.fromFile", so you are reading it
> with Scala standard api, so it will be read sequentially.
>
> Furthermore you are going to need create a schema if you want to use
> dataframes.
>
> El 5/10/2016 1:53, "Ajay Chander" <it...@gmail.com> escribió:
>
>> Right now, I am doing it like below,
>>
>> import scala.io.Source
>>
>> val animalsFile = "/home/ajay/dataset/animal_types.txt"
>> val animalTypes = Source.fromFile(animalsFile).getLines.toArray
>>
>> for ( anmtyp <- animalTypes ) {
>>       val distinctAnmTypCount = sqlContext.sql("select
>> count(distinct("+anmtyp+")) from TEST1 ")
>>       println("Calculating Metrics for Animal Type: "+anmtyp)
>>       if( distinctAnmTypCount.head().getAs[Long](0) <= 10 ){
>>         println("Animal Type: "+anmtyp+" has <= 10 distinct values")
>>       } else {
>>         println("Animal Type: "+anmtyp+" has > 10 distinct values")
>>       }
>>     }
>>
>> But the problem is it is running sequentially.
>>
>> Any inputs are appreciated. Thank you.
>>
>>
>> Regards,
>> Ajay
>>
>>
>> On Tue, Oct 4, 2016 at 7:44 PM, Ajay Chander <it...@gmail.com> wrote:
>>
>>> Hi Everyone,
>>>
>>> I have a use-case where I have two Dataframes like below,
>>>
>>> 1) First Dataframe(DF1) contains,
>>>
>>> *    ANIMALS    *
>>> Mammals
>>> Birds
>>> Fish
>>> Reptiles
>>> Amphibians
>>>
>>> 2) Second Dataframe(DF2) contains,
>>>
>>> *    ID, Mammals, Birds, Fish, Reptiles, Amphibians    *
>>> 1,      Dogs,      Eagle,      Goldfish,      NULL,      Frog
>>> 2,      Cats,      Peacock,      Guppy,     Turtle,      Salamander
>>> 3,      Dolphins,      Eagle,      Zander,      NULL,      Frog
>>> 4,      Whales,      Parrot,      Guppy,      Snake,      Frog
>>> 5,      Horses,      Owl,      Guppy,      Snake,      Frog
>>> 6,      Dolphins,      Kingfisher,      Zander,      Turtle,      Frog
>>> 7,      Dogs,      Sparrow,      Goldfish,      NULL,      Salamander
>>>
>>> Now I want to take each row from DF1 and find out its distinct count in
>>> DF2. Example, pick Mammals from DF1 then find out count(distinct(Mammals))
>>> from DF2 i.e. 5
>>>
>>> DF1 has 70 distinct rows/Animal types
>>> DF2 has some million rows
>>>
>>> Whats the best way to achieve this efficiently using parallelism ?
>>>
>>> Any inputs are helpful. Thank you.
>>>
>>> Regards,
>>> Ajay
>>>
>>>
>>

Re: UseCase_Design_Help

Posted by Daniel <da...@gmail.com>.
First of all, if you want to read a txt file in Spark, you should use
sc.textFile, because you are using "Source.fromFile", so you are reading it
with Scala standard api, so it will be read sequentially.

Furthermore you are going to need create a schema if you want to use
dataframes.

El 5/10/2016 1:53, "Ajay Chander" <it...@gmail.com> escribió:

> Right now, I am doing it like below,
>
> import scala.io.Source
>
> val animalsFile = "/home/ajay/dataset/animal_types.txt"
> val animalTypes = Source.fromFile(animalsFile).getLines.toArray
>
> for ( anmtyp <- animalTypes ) {
>       val distinctAnmTypCount = sqlContext.sql("select
> count(distinct("+anmtyp+")) from TEST1 ")
>       println("Calculating Metrics for Animal Type: "+anmtyp)
>       if( distinctAnmTypCount.head().getAs[Long](0) <= 10 ){
>         println("Animal Type: "+anmtyp+" has <= 10 distinct values")
>       } else {
>         println("Animal Type: "+anmtyp+" has > 10 distinct values")
>       }
>     }
>
> But the problem is it is running sequentially.
>
> Any inputs are appreciated. Thank you.
>
>
> Regards,
> Ajay
>
>
> On Tue, Oct 4, 2016 at 7:44 PM, Ajay Chander <it...@gmail.com> wrote:
>
>> Hi Everyone,
>>
>> I have a use-case where I have two Dataframes like below,
>>
>> 1) First Dataframe(DF1) contains,
>>
>> *    ANIMALS    *
>> Mammals
>> Birds
>> Fish
>> Reptiles
>> Amphibians
>>
>> 2) Second Dataframe(DF2) contains,
>>
>> *    ID, Mammals, Birds, Fish, Reptiles, Amphibians    *
>> 1,      Dogs,      Eagle,      Goldfish,      NULL,      Frog
>> 2,      Cats,      Peacock,      Guppy,     Turtle,      Salamander
>> 3,      Dolphins,      Eagle,      Zander,      NULL,      Frog
>> 4,      Whales,      Parrot,      Guppy,      Snake,      Frog
>> 5,      Horses,      Owl,      Guppy,      Snake,      Frog
>> 6,      Dolphins,      Kingfisher,      Zander,      Turtle,      Frog
>> 7,      Dogs,      Sparrow,      Goldfish,      NULL,      Salamander
>>
>> Now I want to take each row from DF1 and find out its distinct count in
>> DF2. Example, pick Mammals from DF1 then find out count(distinct(Mammals))
>> from DF2 i.e. 5
>>
>> DF1 has 70 distinct rows/Animal types
>> DF2 has some million rows
>>
>> Whats the best way to achieve this efficiently using parallelism ?
>>
>> Any inputs are helpful. Thank you.
>>
>> Regards,
>> Ajay
>>
>>
>

Re: UseCase_Design_Help

Posted by Ajay Chander <it...@gmail.com>.
Right now, I am doing it like below,

import scala.io.Source

val animalsFile = "/home/ajay/dataset/animal_types.txt"
val animalTypes = Source.fromFile(animalsFile).getLines.toArray

for ( anmtyp <- animalTypes ) {
      val distinctAnmTypCount = sqlContext.sql("select
count(distinct("+anmtyp+")) from TEST1 ")
      println("Calculating Metrics for Animal Type: "+anmtyp)
      if( distinctAnmTypCount.head().getAs[Long](0) <= 10 ){
        println("Animal Type: "+anmtyp+" has <= 10 distinct values")
      } else {
        println("Animal Type: "+anmtyp+" has > 10 distinct values")
      }
    }

But the problem is it is running sequentially.

Any inputs are appreciated. Thank you.


Regards,
Ajay


On Tue, Oct 4, 2016 at 7:44 PM, Ajay Chander <it...@gmail.com> wrote:

> Hi Everyone,
>
> I have a use-case where I have two Dataframes like below,
>
> 1) First Dataframe(DF1) contains,
>
> *    ANIMALS    *
> Mammals
> Birds
> Fish
> Reptiles
> Amphibians
>
> 2) Second Dataframe(DF2) contains,
>
> *    ID, Mammals, Birds, Fish, Reptiles, Amphibians    *
> 1,      Dogs,      Eagle,      Goldfish,      NULL,      Frog
> 2,      Cats,      Peacock,      Guppy,     Turtle,      Salamander
> 3,      Dolphins,      Eagle,      Zander,      NULL,      Frog
> 4,      Whales,      Parrot,      Guppy,      Snake,      Frog
> 5,      Horses,      Owl,      Guppy,      Snake,      Frog
> 6,      Dolphins,      Kingfisher,      Zander,      Turtle,      Frog
> 7,      Dogs,      Sparrow,      Goldfish,      NULL,      Salamander
>
> Now I want to take each row from DF1 and find out its distinct count in
> DF2. Example, pick Mammals from DF1 then find out count(distinct(Mammals))
> from DF2 i.e. 5
>
> DF1 has 70 distinct rows/Animal types
> DF2 has some million rows
>
> Whats the best way to achieve this efficiently using parallelism ?
>
> Any inputs are helpful. Thank you.
>
> Regards,
> Ajay
>
>