You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Craig Ching <cr...@gmail.com> on 2017/03/22 19:17:33 UTC

calculate diff of value and median in a group

Hi,

When using pyspark, I'd like to be able to calculate the difference between
grouped values and their median for the group.  Is this possible?  Here is
some code I hacked up that does what I want except that it calculates the
grouped diff from mean.  Also, please feel free to comment on how I could
make this better if you feel like being helpful :)

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StringType,
    LongType,
    DoubleType,
    StructField,
    StructType
)
from pyspark.sql import functions as F


sc = SparkContext(appName='myapp')
spark = SparkSession(sc)

file_name = 'data.csv'

fields = [
    StructField(
        'group2',
        LongType(),
        True),
    StructField(
        'name',
        StringType(),
        True),
    StructField(
        'value',
        DoubleType(),
        True),
    StructField(
        'group1',
        LongType(),
        True)
]
schema = StructType(fields)

df = spark.read.csv(
    file_name, header=False, mode="DROPMALFORMED", schema=schema
)
df.show()
means = df.select([
    'group1',
    'group2',
    'name',
    'value']).groupBy([
        'group1',
        'group2'
    ]).agg(
        F.mean('value').alias('mean_value')
    ).orderBy('group1', 'group2')

cond = [df.group1 == means.group1, df.group2 == means.group2]

means.show()
df = df.select([
    'group1',
    'group2',
    'name',
    'value']).join(
        means,
        cond
    ).drop(
        df.group1
    ).drop(
        df.group2
    ).select('group1',
             'group2',
             'name',
             'value',
             'mean_value')

final = df.withColumn(
    'diff',
    F.abs(df.value - df.mean_value))
final.show()

sc.stop()

And here is an example dataset I'm playing with:

100,name1,0.43,0
100,name2,0.33,0
100,name3,0.73,0
101,name1,0.29,0
101,name2,0.96,0
101,name3,0.42,0
102,name1,0.01,0
102,name2,0.42,0
102,name3,0.51,0
103,name1,0.55,0
103,name2,0.45,0
103,name3,0.02,0
104,name1,0.93,0
104,name2,0.16,0
104,name3,0.74,0
105,name1,0.41,0
105,name2,0.65,0
105,name3,0.29,0
100,name1,0.51,1
100,name2,0.51,1
100,name3,0.43,1
101,name1,0.59,1
101,name2,0.55,1
101,name3,0.84,1
102,name1,0.01,1
102,name2,0.98,1
102,name3,0.44,1
103,name1,0.47,1
103,name2,0.16,1
103,name3,0.02,1
104,name1,0.83,1
104,name2,0.89,1
104,name3,0.31,1
105,name1,0.59,1
105,name2,0.77,1
105,name3,0.45,1

and here is what I'm trying to produce:

group1,group2,name,value,median,diff
0,100,name1,0.43,0.43,0.0
0,100,name2,0.33,0.43,0.10
0,100,name3,0.73,0.43,0.30
0,101,name1,0.29,0.42,0.13
0,101,name2,0.96,0.42,0.54
0,101,name3,0.42,0.42,0.0
0,102,name1,0.01,0.42,0.41
0,102,name2,0.42,0.42,0.0
0,102,name3,0.51,0.42,0.09
0,103,name1,0.55,0.45,0.10
0,103,name2,0.45,0.45,0.0
0,103,name3,0.02,0.45,0.43
0,104,name1,0.93,0.74,0.19
0,104,name2,0.16,0.74,0.58
0,104,name3,0.74,0.74,0.0
0,105,name1,0.41,0.41,0.0
0,105,name2,0.65,0.41,0.24
0,105,name3,0.29,0.41,0.24
1,100,name1,0.51,0.51,0.0
1,100,name2,0.51,0.51,0.0
1,100,name3,0.43,0.51,0.08
1,101,name1,0.59,0.59,0.0
1,101,name2,0.55,0.59,0.04
1,101,name3,0.84,0.59,0.25
1,102,name1,0.01,0.44,0.43
1,102,name2,0.98,0.44,0.54
1,102,name3,0.44,0.44,0.0
1,103,name1,0.47,0.16,0.31
1,103,name2,0.16,0.16,0.0
1,103,name3,0.02,0.16,0.14
1,104,name1,0.83,0.83,0.0
1,104,name2,0.89,0.83,0.06
1,104,name3,0.31,0.83,0.52
1,105,name1,0.59,0.59,0.0
1,105,name2,0.77,0.59,0.18
1,105,name3,0.45,0.59,0.14

Thanks for any help!

Cheers,
Craig

Re: calculate diff of value and median in a group

Posted by roni <ro...@gmail.com>.
I was using this function percentile_approx  on 100GB of compressed data
and it just hangs there. Any pointers?

On Wed, Mar 22, 2017 at 6:09 PM, ayan guha <gu...@gmail.com> wrote:

> For median, use percentile_approx with 0.5 (50th percentile is the median)
>
> On Thu, Mar 23, 2017 at 11:01 AM, Yong Zhang <ja...@hotmail.com> wrote:
>
>> He is looking for median, not mean/avg.
>>
>>
>> You have to implement the median logic by yourself, as there is no
>> directly implementation from Spark. You can use RDD API, if you are using
>> 1.6.x, or dataset if 2.x
>>
>>
>> The following example gives you an idea how to calculate the median using
>> dataset API. You can even change the code to add additional logic to
>> calculate the diff of every value with the median.
>>
>>
>> scala> spark.version
>> res31: String = 2.1.0
>>
>> scala> val ds = Seq((100,0.43),(100,0.33),(100,0.73),(101,0.29),(101,0.96),
>> (101,0.42),(101,0.01)).toDF("id","value").as[(Int, Double)]
>> ds: org.apache.spark.sql.Dataset[(Int, Double)] = [id: int, value: double]
>>
>> scala> ds.show
>> +---+-----+
>> | id|value|
>> +---+-----+
>> |100| 0.43|
>> |100| 0.33|
>> |100| 0.73|
>> |101| 0.29|
>> |101| 0.96|
>> |101| 0.42|
>> |101| 0.01|
>> +---+-----+
>>
>> scala> def median(seq: Seq[Double]) = {
>>      |   val size = seq.size
>>      |   val sorted = seq.sorted
>>      |   size match {
>>      |     case even if size % 2 == 0 => (sorted((size-2)/2) + sorted(size/2)) / 2
>>      |     case odd => sorted((size-1)/2)
>>      |   }
>>      | }
>> median: (seq: Seq[Double])Double
>>
>> scala> ds.groupByKey(_._1).mapGroups((id, iter) => (id, median(iter.map(_._2).toSeq))).show
>> +---+-----+
>> | _1|   _2|
>> +---+-----+
>> |101|0.355|
>> |100| 0.43|
>> +---+-----+
>>
>>
>> Yong
>>
>>
>>
>>
>> ------------------------------
>> *From:* ayan guha <gu...@gmail.com>
>> *Sent:* Wednesday, March 22, 2017 7:23 PM
>> *To:* Craig Ching
>> *Cc:* Yong Zhang; user@spark.apache.org
>> *Subject:* Re: calculate diff of value and median in a group
>>
>> I would suggest use window function with partitioning.
>>
>> select group1,group2,name,value, avg(value) over (partition group1,group2
>> order by name) m
>> from t
>>
>> On Thu, Mar 23, 2017 at 9:58 AM, Craig Ching <cr...@gmail.com>
>> wrote:
>>
>>> Are the elements count big per group? If not, you can group them and use
>>> the code to calculate the median and diff.
>>>
>>>
>>> They're not big, no.  Any pointers on how I might do that?  The part I'm
>>> having trouble with is the grouping, I can't seem to see how to do the
>>> median per group.  For mean, we have the agg feature, but not for median
>>> (and I understand the reasons for that).
>>>
>>> Yong
>>>
>>> ------------------------------
>>> *From:* Craig Ching <cr...@gmail.com>
>>> *Sent:* Wednesday, March 22, 2017 3:17 PM
>>> *To:* user@spark.apache.org
>>> *Subject:* calculate diff of value and median in a group
>>>
>>> Hi,
>>>
>>> When using pyspark, I'd like to be able to calculate the difference
>>> between grouped values and their median for the group.  Is this possible?
>>> Here is some code I hacked up that does what I want except that it
>>> calculates the grouped diff from mean.  Also, please feel free to comment
>>> on how I could make this better if you feel like being helpful :)
>>>
>>> from pyspark import SparkContext
>>> from pyspark.sql import SparkSession
>>> from pyspark.sql.types import (
>>>     StringType,
>>>     LongType,
>>>     DoubleType,
>>>     StructField,
>>>     StructType
>>> )
>>> from pyspark.sql import functions as F
>>>
>>>
>>> sc = SparkContext(appName='myapp')
>>> spark = SparkSession(sc)
>>>
>>> file_name = 'data.csv'
>>>
>>> fields = [
>>>     StructField(
>>>         'group2',
>>>         LongType(),
>>>         True),
>>>     StructField(
>>>         'name',
>>>         StringType(),
>>>         True),
>>>     StructField(
>>>         'value',
>>>         DoubleType(),
>>>         True),
>>>     StructField(
>>>         'group1',
>>>         LongType(),
>>>         True)
>>> ]
>>> schema = StructType(fields)
>>>
>>> df = spark.read.csv(
>>>     file_name, header=False, mode="DROPMALFORMED", schema=schema
>>> )
>>> df.show()
>>> means = df.select([
>>>     'group1',
>>>     'group2',
>>>     'name',
>>>     'value']).groupBy([
>>>         'group1',
>>>         'group2'
>>>     ]).agg(
>>>         F.mean('value').alias('mean_value')
>>>     ).orderBy('group1', 'group2')
>>>
>>> cond = [df.group1 == means.group1, df.group2 == means.group2]
>>>
>>> means.show()
>>> df = df.select([
>>>     'group1',
>>>     'group2',
>>>     'name',
>>>     'value']).join(
>>>         means,
>>>         cond
>>>     ).drop(
>>>         df.group1
>>>     ).drop(
>>>         df.group2
>>>     ).select('group1',
>>>              'group2',
>>>              'name',
>>>              'value',
>>>              'mean_value')
>>>
>>> final = df.withColumn(
>>>     'diff',
>>>     F.abs(df.value - df.mean_value))
>>> final.show()
>>>
>>> sc.stop()
>>>
>>> And here is an example dataset I'm playing with:
>>>
>>> 100,name1,0.43,0
>>> 100,name2,0.33,0
>>> 100,name3,0.73,0
>>> 101,name1,0.29,0
>>> 101,name2,0.96,0
>>> 101,name3,0.42,0
>>> 102,name1,0.01,0
>>> 102,name2,0.42,0
>>> 102,name3,0.51,0
>>> 103,name1,0.55,0
>>> 103,name2,0.45,0
>>> 103,name3,0.02,0
>>> 104,name1,0.93,0
>>> 104,name2,0.16,0
>>> 104,name3,0.74,0
>>> 105,name1,0.41,0
>>> 105,name2,0.65,0
>>> 105,name3,0.29,0
>>> 100,name1,0.51,1
>>> 100,name2,0.51,1
>>> 100,name3,0.43,1
>>> 101,name1,0.59,1
>>> 101,name2,0.55,1
>>> 101,name3,0.84,1
>>> 102,name1,0.01,1
>>> 102,name2,0.98,1
>>> 102,name3,0.44,1
>>> 103,name1,0.47,1
>>> 103,name2,0.16,1
>>> 103,name3,0.02,1
>>> 104,name1,0.83,1
>>> 104,name2,0.89,1
>>> 104,name3,0.31,1
>>> 105,name1,0.59,1
>>> 105,name2,0.77,1
>>> 105,name3,0.45,1
>>>
>>> and here is what I'm trying to produce:
>>>
>>> group1,group2,name,value,median,diff
>>> 0,100,name1,0.43,0.43,0.0
>>> 0,100,name2,0.33,0.43,0.10
>>> 0,100,name3,0.73,0.43,0.30
>>> 0,101,name1,0.29,0.42,0.13
>>> 0,101,name2,0.96,0.42,0.54
>>> 0,101,name3,0.42,0.42,0.0
>>> 0,102,name1,0.01,0.42,0.41
>>> 0,102,name2,0.42,0.42,0.0
>>> 0,102,name3,0.51,0.42,0.09
>>> 0,103,name1,0.55,0.45,0.10
>>> 0,103,name2,0.45,0.45,0.0
>>> 0,103,name3,0.02,0.45,0.43
>>> 0,104,name1,0.93,0.74,0.19
>>> 0,104,name2,0.16,0.74,0.58
>>> 0,104,name3,0.74,0.74,0.0
>>> 0,105,name1,0.41,0.41,0.0
>>> 0,105,name2,0.65,0.41,0.24
>>> 0,105,name3,0.29,0.41,0.24
>>> 1,100,name1,0.51,0.51,0.0
>>> 1,100,name2,0.51,0.51,0.0
>>> 1,100,name3,0.43,0.51,0.08
>>> 1,101,name1,0.59,0.59,0.0
>>> 1,101,name2,0.55,0.59,0.04
>>> 1,101,name3,0.84,0.59,0.25
>>> 1,102,name1,0.01,0.44,0.43
>>> 1,102,name2,0.98,0.44,0.54
>>> 1,102,name3,0.44,0.44,0.0
>>> 1,103,name1,0.47,0.16,0.31
>>> 1,103,name2,0.16,0.16,0.0
>>> 1,103,name3,0.02,0.16,0.14
>>> 1,104,name1,0.83,0.83,0.0
>>> 1,104,name2,0.89,0.83,0.06
>>> 1,104,name3,0.31,0.83,0.52
>>> 1,105,name1,0.59,0.59,0.0
>>> 1,105,name2,0.77,0.59,0.18
>>> 1,105,name3,0.45,0.59,0.14
>>>
>>> Thanks for any help!
>>>
>>> Cheers,
>>> Craig
>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>
>
> --
> Best Regards,
> Ayan Guha
>

Re: calculate diff of value and median in a group

Posted by ayan guha <gu...@gmail.com>.
For median, use percentile_approx with 0.5 (50th percentile is the median)

On Thu, Mar 23, 2017 at 11:01 AM, Yong Zhang <ja...@hotmail.com> wrote:

> He is looking for median, not mean/avg.
>
>
> You have to implement the median logic by yourself, as there is no
> directly implementation from Spark. You can use RDD API, if you are using
> 1.6.x, or dataset if 2.x
>
>
> The following example gives you an idea how to calculate the median using
> dataset API. You can even change the code to add additional logic to
> calculate the diff of every value with the median.
>
>
> scala> spark.version
> res31: String = 2.1.0
>
> scala> val ds = Seq((100,0.43),(100,0.33),(100,0.73),(101,0.29),(101,0.96),
> (101,0.42),(101,0.01)).toDF("id","value").as[(Int, Double)]
> ds: org.apache.spark.sql.Dataset[(Int, Double)] = [id: int, value: double]
>
> scala> ds.show
> +---+-----+
> | id|value|
> +---+-----+
> |100| 0.43|
> |100| 0.33|
> |100| 0.73|
> |101| 0.29|
> |101| 0.96|
> |101| 0.42|
> |101| 0.01|
> +---+-----+
>
> scala> def median(seq: Seq[Double]) = {
>      |   val size = seq.size
>      |   val sorted = seq.sorted
>      |   size match {
>      |     case even if size % 2 == 0 => (sorted((size-2)/2) + sorted(size/2)) / 2
>      |     case odd => sorted((size-1)/2)
>      |   }
>      | }
> median: (seq: Seq[Double])Double
>
> scala> ds.groupByKey(_._1).mapGroups((id, iter) => (id, median(iter.map(_._2).toSeq))).show
> +---+-----+
> | _1|   _2|
> +---+-----+
> |101|0.355|
> |100| 0.43|
> +---+-----+
>
>
> Yong
>
>
>
>
> ------------------------------
> *From:* ayan guha <gu...@gmail.com>
> *Sent:* Wednesday, March 22, 2017 7:23 PM
> *To:* Craig Ching
> *Cc:* Yong Zhang; user@spark.apache.org
> *Subject:* Re: calculate diff of value and median in a group
>
> I would suggest use window function with partitioning.
>
> select group1,group2,name,value, avg(value) over (partition group1,group2
> order by name) m
> from t
>
> On Thu, Mar 23, 2017 at 9:58 AM, Craig Ching <cr...@gmail.com> wrote:
>
>> Are the elements count big per group? If not, you can group them and use
>> the code to calculate the median and diff.
>>
>>
>> They're not big, no.  Any pointers on how I might do that?  The part I'm
>> having trouble with is the grouping, I can't seem to see how to do the
>> median per group.  For mean, we have the agg feature, but not for median
>> (and I understand the reasons for that).
>>
>> Yong
>>
>> ------------------------------
>> *From:* Craig Ching <cr...@gmail.com>
>> *Sent:* Wednesday, March 22, 2017 3:17 PM
>> *To:* user@spark.apache.org
>> *Subject:* calculate diff of value and median in a group
>>
>> Hi,
>>
>> When using pyspark, I'd like to be able to calculate the difference
>> between grouped values and their median for the group.  Is this possible?
>> Here is some code I hacked up that does what I want except that it
>> calculates the grouped diff from mean.  Also, please feel free to comment
>> on how I could make this better if you feel like being helpful :)
>>
>> from pyspark import SparkContext
>> from pyspark.sql import SparkSession
>> from pyspark.sql.types import (
>>     StringType,
>>     LongType,
>>     DoubleType,
>>     StructField,
>>     StructType
>> )
>> from pyspark.sql import functions as F
>>
>>
>> sc = SparkContext(appName='myapp')
>> spark = SparkSession(sc)
>>
>> file_name = 'data.csv'
>>
>> fields = [
>>     StructField(
>>         'group2',
>>         LongType(),
>>         True),
>>     StructField(
>>         'name',
>>         StringType(),
>>         True),
>>     StructField(
>>         'value',
>>         DoubleType(),
>>         True),
>>     StructField(
>>         'group1',
>>         LongType(),
>>         True)
>> ]
>> schema = StructType(fields)
>>
>> df = spark.read.csv(
>>     file_name, header=False, mode="DROPMALFORMED", schema=schema
>> )
>> df.show()
>> means = df.select([
>>     'group1',
>>     'group2',
>>     'name',
>>     'value']).groupBy([
>>         'group1',
>>         'group2'
>>     ]).agg(
>>         F.mean('value').alias('mean_value')
>>     ).orderBy('group1', 'group2')
>>
>> cond = [df.group1 == means.group1, df.group2 == means.group2]
>>
>> means.show()
>> df = df.select([
>>     'group1',
>>     'group2',
>>     'name',
>>     'value']).join(
>>         means,
>>         cond
>>     ).drop(
>>         df.group1
>>     ).drop(
>>         df.group2
>>     ).select('group1',
>>              'group2',
>>              'name',
>>              'value',
>>              'mean_value')
>>
>> final = df.withColumn(
>>     'diff',
>>     F.abs(df.value - df.mean_value))
>> final.show()
>>
>> sc.stop()
>>
>> And here is an example dataset I'm playing with:
>>
>> 100,name1,0.43,0
>> 100,name2,0.33,0
>> 100,name3,0.73,0
>> 101,name1,0.29,0
>> 101,name2,0.96,0
>> 101,name3,0.42,0
>> 102,name1,0.01,0
>> 102,name2,0.42,0
>> 102,name3,0.51,0
>> 103,name1,0.55,0
>> 103,name2,0.45,0
>> 103,name3,0.02,0
>> 104,name1,0.93,0
>> 104,name2,0.16,0
>> 104,name3,0.74,0
>> 105,name1,0.41,0
>> 105,name2,0.65,0
>> 105,name3,0.29,0
>> 100,name1,0.51,1
>> 100,name2,0.51,1
>> 100,name3,0.43,1
>> 101,name1,0.59,1
>> 101,name2,0.55,1
>> 101,name3,0.84,1
>> 102,name1,0.01,1
>> 102,name2,0.98,1
>> 102,name3,0.44,1
>> 103,name1,0.47,1
>> 103,name2,0.16,1
>> 103,name3,0.02,1
>> 104,name1,0.83,1
>> 104,name2,0.89,1
>> 104,name3,0.31,1
>> 105,name1,0.59,1
>> 105,name2,0.77,1
>> 105,name3,0.45,1
>>
>> and here is what I'm trying to produce:
>>
>> group1,group2,name,value,median,diff
>> 0,100,name1,0.43,0.43,0.0
>> 0,100,name2,0.33,0.43,0.10
>> 0,100,name3,0.73,0.43,0.30
>> 0,101,name1,0.29,0.42,0.13
>> 0,101,name2,0.96,0.42,0.54
>> 0,101,name3,0.42,0.42,0.0
>> 0,102,name1,0.01,0.42,0.41
>> 0,102,name2,0.42,0.42,0.0
>> 0,102,name3,0.51,0.42,0.09
>> 0,103,name1,0.55,0.45,0.10
>> 0,103,name2,0.45,0.45,0.0
>> 0,103,name3,0.02,0.45,0.43
>> 0,104,name1,0.93,0.74,0.19
>> 0,104,name2,0.16,0.74,0.58
>> 0,104,name3,0.74,0.74,0.0
>> 0,105,name1,0.41,0.41,0.0
>> 0,105,name2,0.65,0.41,0.24
>> 0,105,name3,0.29,0.41,0.24
>> 1,100,name1,0.51,0.51,0.0
>> 1,100,name2,0.51,0.51,0.0
>> 1,100,name3,0.43,0.51,0.08
>> 1,101,name1,0.59,0.59,0.0
>> 1,101,name2,0.55,0.59,0.04
>> 1,101,name3,0.84,0.59,0.25
>> 1,102,name1,0.01,0.44,0.43
>> 1,102,name2,0.98,0.44,0.54
>> 1,102,name3,0.44,0.44,0.0
>> 1,103,name1,0.47,0.16,0.31
>> 1,103,name2,0.16,0.16,0.0
>> 1,103,name3,0.02,0.16,0.14
>> 1,104,name1,0.83,0.83,0.0
>> 1,104,name2,0.89,0.83,0.06
>> 1,104,name3,0.31,0.83,0.52
>> 1,105,name1,0.59,0.59,0.0
>> 1,105,name2,0.77,0.59,0.18
>> 1,105,name3,0.45,0.59,0.14
>>
>> Thanks for any help!
>>
>> Cheers,
>> Craig
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>



-- 
Best Regards,
Ayan Guha

Re: calculate diff of value and median in a group

Posted by Yong Zhang <ja...@hotmail.com>.
He is looking for median, not mean/avg.


You have to implement the median logic by yourself, as there is no directly implementation from Spark. You can use RDD API, if you are using 1.6.x, or dataset if 2.x


The following example gives you an idea how to calculate the median using dataset API. You can even change the code to add additional logic to calculate the diff of every value with the median.


scala> spark.version
res31: String = 2.1.0

scala> val ds = Seq((100,0.43),(100,0.33),(100,0.73),(101,0.29),(101,0.96),
(101,0.42),(101,0.01)).toDF("id","value").as[(Int, Double)]
ds: org.apache.spark.sql.Dataset[(Int, Double)] = [id: int, value: double]

scala> ds.show
+---+-----+
| id|value|
+---+-----+
|100| 0.43|
|100| 0.33|
|100| 0.73|
|101| 0.29|
|101| 0.96|
|101| 0.42|
|101| 0.01|
+---+-----+

scala> def median(seq: Seq[Double]) = {
     |   val size = seq.size
     |   val sorted = seq.sorted
     |   size match {
     |     case even if size % 2 == 0 => (sorted((size-2)/2) + sorted(size/2)) / 2
     |     case odd => sorted((size-1)/2)
     |   }
     | }
median: (seq: Seq[Double])Double

scala> ds.groupByKey(_._1).mapGroups((id, iter) => (id, median(iter.map(_._2).toSeq))).show
+---+-----+
| _1|   _2|
+---+-----+
|101|0.355|
|100| 0.43|
+---+-----+


Yong



________________________________
From: ayan guha <gu...@gmail.com>
Sent: Wednesday, March 22, 2017 7:23 PM
To: Craig Ching
Cc: Yong Zhang; user@spark.apache.org
Subject: Re: calculate diff of value and median in a group

I would suggest use window function with partitioning.

select group1,group2,name,value, avg(value) over (partition group1,group2 order by name) m
from t

On Thu, Mar 23, 2017 at 9:58 AM, Craig Ching <cr...@gmail.com>> wrote:

Are the elements count big per group? If not, you can group them and use the code to calculate the median and diff.


They're not big, no.  Any pointers on how I might do that?  The part I'm having trouble with is the grouping, I can't seem to see how to do the median per group.  For mean, we have the agg feature, but not for median (and I understand the reasons for that).


Yong

________________________________
From: Craig Ching <cr...@gmail.com>>
Sent: Wednesday, March 22, 2017 3:17 PM
To: user@spark.apache.org<ma...@spark.apache.org>
Subject: calculate diff of value and median in a group

Hi,

When using pyspark, I'd like to be able to calculate the difference between grouped values and their median for the group.  Is this possible?  Here is some code I hacked up that does what I want except that it calculates the grouped diff from mean.  Also, please feel free to comment on how I could make this better if you feel like being helpful :)

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StringType,
    LongType,
    DoubleType,
    StructField,
    StructType
)
from pyspark.sql import functions as F


sc = SparkContext(appName='myapp')
spark = SparkSession(sc)

file_name = 'data.csv'

fields = [
    StructField(
        'group2',
        LongType(),
        True),
    StructField(
        'name',
        StringType(),
        True),
    StructField(
        'value',
        DoubleType(),
        True),
    StructField(
        'group1',
        LongType(),
        True)
]
schema = StructType(fields)

df = spark.read.csv(
    file_name, header=False, mode="DROPMALFORMED", schema=schema
)
df.show()
means = df.select([
    'group1',
    'group2',
    'name',
    'value']).groupBy([
        'group1',
        'group2'
    ]).agg(
        F.mean('value').alias('mean_value')
    ).orderBy('group1', 'group2')

cond = [df.group1 == means.group1, df.group2 == means.group2]

means.show()
df = df.select([
    'group1',
    'group2',
    'name',
    'value']).join(
        means,
        cond
    ).drop(
        df.group1
    ).drop(
        df.group2
    ).select('group1',
             'group2',
             'name',
             'value',
             'mean_value')

final = df.withColumn(
    'diff',
    F.abs(df.value - df.mean_value))
final.show()

sc.stop()

And here is an example dataset I'm playing with:

100,name1,0.43,0
100,name2,0.33,0
100,name3,0.73,0
101,name1,0.29,0
101,name2,0.96,0
101,name3,0.42,0
102,name1,0.01,0
102,name2,0.42,0
102,name3,0.51,0
103,name1,0.55,0
103,name2,0.45,0
103,name3,0.02,0
104,name1,0.93,0
104,name2,0.16,0
104,name3,0.74,0
105,name1,0.41,0
105,name2,0.65,0
105,name3,0.29,0
100,name1,0.51,1
100,name2,0.51,1
100,name3,0.43,1
101,name1,0.59,1
101,name2,0.55,1
101,name3,0.84,1
102,name1,0.01,1
102,name2,0.98,1
102,name3,0.44,1
103,name1,0.47,1
103,name2,0.16,1
103,name3,0.02,1
104,name1,0.83,1
104,name2,0.89,1
104,name3,0.31,1
105,name1,0.59,1
105,name2,0.77,1
105,name3,0.45,1

and here is what I'm trying to produce:

group1,group2,name,value,median,diff
0,100,name1,0.43,0.43,0.0
0,100,name2,0.33,0.43,0.10
0,100,name3,0.73,0.43,0.30
0,101,name1,0.29,0.42,0.13
0,101,name2,0.96,0.42,0.54
0,101,name3,0.42,0.42,0.0
0,102,name1,0.01,0.42,0.41
0,102,name2,0.42,0.42,0.0
0,102,name3,0.51,0.42,0.09
0,103,name1,0.55,0.45,0.10
0,103,name2,0.45,0.45,0.0
0,103,name3,0.02,0.45,0.43
0,104,name1,0.93,0.74,0.19
0,104,name2,0.16,0.74,0.58
0,104,name3,0.74,0.74,0.0
0,105,name1,0.41,0.41,0.0
0,105,name2,0.65,0.41,0.24
0,105,name3,0.29,0.41,0.24
1,100,name1,0.51,0.51,0.0
1,100,name2,0.51,0.51,0.0
1,100,name3,0.43,0.51,0.08
1,101,name1,0.59,0.59,0.0
1,101,name2,0.55,0.59,0.04
1,101,name3,0.84,0.59,0.25
1,102,name1,0.01,0.44,0.43
1,102,name2,0.98,0.44,0.54
1,102,name3,0.44,0.44,0.0
1,103,name1,0.47,0.16,0.31
1,103,name2,0.16,0.16,0.0
1,103,name3,0.02,0.16,0.14
1,104,name1,0.83,0.83,0.0
1,104,name2,0.89,0.83,0.06
1,104,name3,0.31,0.83,0.52
1,105,name1,0.59,0.59,0.0
1,105,name2,0.77,0.59,0.18
1,105,name3,0.45,0.59,0.14

Thanks for any help!

Cheers,
Craig



--
Best Regards,
Ayan Guha

Re: calculate diff of value and median in a group

Posted by ayan guha <gu...@gmail.com>.
I would suggest use window function with partitioning.

select group1,group2,name,value, avg(value) over (partition group1,group2
order by name) m
from t

On Thu, Mar 23, 2017 at 9:58 AM, Craig Ching <cr...@gmail.com> wrote:

> Are the elements count big per group? If not, you can group them and use
> the code to calculate the median and diff.
>
>
> They're not big, no.  Any pointers on how I might do that?  The part I'm
> having trouble with is the grouping, I can't seem to see how to do the
> median per group.  For mean, we have the agg feature, but not for median
> (and I understand the reasons for that).
>
> Yong
>
> ------------------------------
> *From:* Craig Ching <cr...@gmail.com>
> *Sent:* Wednesday, March 22, 2017 3:17 PM
> *To:* user@spark.apache.org
> *Subject:* calculate diff of value and median in a group
>
> Hi,
>
> When using pyspark, I'd like to be able to calculate the difference
> between grouped values and their median for the group.  Is this possible?
> Here is some code I hacked up that does what I want except that it
> calculates the grouped diff from mean.  Also, please feel free to comment
> on how I could make this better if you feel like being helpful :)
>
> from pyspark import SparkContext
> from pyspark.sql import SparkSession
> from pyspark.sql.types import (
>     StringType,
>     LongType,
>     DoubleType,
>     StructField,
>     StructType
> )
> from pyspark.sql import functions as F
>
>
> sc = SparkContext(appName='myapp')
> spark = SparkSession(sc)
>
> file_name = 'data.csv'
>
> fields = [
>     StructField(
>         'group2',
>         LongType(),
>         True),
>     StructField(
>         'name',
>         StringType(),
>         True),
>     StructField(
>         'value',
>         DoubleType(),
>         True),
>     StructField(
>         'group1',
>         LongType(),
>         True)
> ]
> schema = StructType(fields)
>
> df = spark.read.csv(
>     file_name, header=False, mode="DROPMALFORMED", schema=schema
> )
> df.show()
> means = df.select([
>     'group1',
>     'group2',
>     'name',
>     'value']).groupBy([
>         'group1',
>         'group2'
>     ]).agg(
>         F.mean('value').alias('mean_value')
>     ).orderBy('group1', 'group2')
>
> cond = [df.group1 == means.group1, df.group2 == means.group2]
>
> means.show()
> df = df.select([
>     'group1',
>     'group2',
>     'name',
>     'value']).join(
>         means,
>         cond
>     ).drop(
>         df.group1
>     ).drop(
>         df.group2
>     ).select('group1',
>              'group2',
>              'name',
>              'value',
>              'mean_value')
>
> final = df.withColumn(
>     'diff',
>     F.abs(df.value - df.mean_value))
> final.show()
>
> sc.stop()
>
> And here is an example dataset I'm playing with:
>
> 100,name1,0.43,0
> 100,name2,0.33,0
> 100,name3,0.73,0
> 101,name1,0.29,0
> 101,name2,0.96,0
> 101,name3,0.42,0
> 102,name1,0.01,0
> 102,name2,0.42,0
> 102,name3,0.51,0
> 103,name1,0.55,0
> 103,name2,0.45,0
> 103,name3,0.02,0
> 104,name1,0.93,0
> 104,name2,0.16,0
> 104,name3,0.74,0
> 105,name1,0.41,0
> 105,name2,0.65,0
> 105,name3,0.29,0
> 100,name1,0.51,1
> 100,name2,0.51,1
> 100,name3,0.43,1
> 101,name1,0.59,1
> 101,name2,0.55,1
> 101,name3,0.84,1
> 102,name1,0.01,1
> 102,name2,0.98,1
> 102,name3,0.44,1
> 103,name1,0.47,1
> 103,name2,0.16,1
> 103,name3,0.02,1
> 104,name1,0.83,1
> 104,name2,0.89,1
> 104,name3,0.31,1
> 105,name1,0.59,1
> 105,name2,0.77,1
> 105,name3,0.45,1
>
> and here is what I'm trying to produce:
>
> group1,group2,name,value,median,diff
> 0,100,name1,0.43,0.43,0.0
> 0,100,name2,0.33,0.43,0.10
> 0,100,name3,0.73,0.43,0.30
> 0,101,name1,0.29,0.42,0.13
> 0,101,name2,0.96,0.42,0.54
> 0,101,name3,0.42,0.42,0.0
> 0,102,name1,0.01,0.42,0.41
> 0,102,name2,0.42,0.42,0.0
> 0,102,name3,0.51,0.42,0.09
> 0,103,name1,0.55,0.45,0.10
> 0,103,name2,0.45,0.45,0.0
> 0,103,name3,0.02,0.45,0.43
> 0,104,name1,0.93,0.74,0.19
> 0,104,name2,0.16,0.74,0.58
> 0,104,name3,0.74,0.74,0.0
> 0,105,name1,0.41,0.41,0.0
> 0,105,name2,0.65,0.41,0.24
> 0,105,name3,0.29,0.41,0.24
> 1,100,name1,0.51,0.51,0.0
> 1,100,name2,0.51,0.51,0.0
> 1,100,name3,0.43,0.51,0.08
> 1,101,name1,0.59,0.59,0.0
> 1,101,name2,0.55,0.59,0.04
> 1,101,name3,0.84,0.59,0.25
> 1,102,name1,0.01,0.44,0.43
> 1,102,name2,0.98,0.44,0.54
> 1,102,name3,0.44,0.44,0.0
> 1,103,name1,0.47,0.16,0.31
> 1,103,name2,0.16,0.16,0.0
> 1,103,name3,0.02,0.16,0.14
> 1,104,name1,0.83,0.83,0.0
> 1,104,name2,0.89,0.83,0.06
> 1,104,name3,0.31,0.83,0.52
> 1,105,name1,0.59,0.59,0.0
> 1,105,name2,0.77,0.59,0.18
> 1,105,name3,0.45,0.59,0.14
>
> Thanks for any help!
>
> Cheers,
> Craig
>
>


-- 
Best Regards,
Ayan Guha

Re: calculate diff of value and median in a group

Posted by Craig Ching <cr...@gmail.com>.
> Are the elements count big per group? If not, you can group them and use the code to calculate the median and diff.
> 
> 
> 
They're not big, no.  Any pointers on how I might do that?  The part I'm having trouble with is the grouping, I can't seem to see how to do the median per group.  For mean, we have the agg feature, but not for median (and I understand the reasons for that).

> Yong
> 
> 
> From: Craig Ching <cr...@gmail.com>
> Sent: Wednesday, March 22, 2017 3:17 PM
> To: user@spark.apache.org
> Subject: calculate diff of value and median in a group
>  
> Hi,
> 
> When using pyspark, I'd like to be able to calculate the difference between grouped values and their median for the group.  Is this possible?  Here is some code I hacked up that does what I want except that it calculates the grouped diff from mean.  Also, please feel free to comment on how I could make this better if you feel like being helpful :)
> 
> from pyspark import SparkContext
> from pyspark.sql import SparkSession
> from pyspark.sql.types import (
>     StringType,
>     LongType,
>     DoubleType,
>     StructField,
>     StructType
> )
> from pyspark.sql import functions as F
> 
> 
> sc = SparkContext(appName='myapp')
> spark = SparkSession(sc)
> 
> file_name = 'data.csv'
> 
> fields = [
>     StructField(
>         'group2',
>         LongType(),
>         True),
>     StructField(
>         'name',
>         StringType(),
>         True),
>     StructField(
>         'value',
>         DoubleType(),
>         True),
>     StructField(
>         'group1',
>         LongType(),
>         True)
> ]
> schema = StructType(fields)
> 
> df = spark.read.csv(
>     file_name, header=False, mode="DROPMALFORMED", schema=schema
> )
> df.show()
> means = df.select([
>     'group1',
>     'group2',
>     'name',
>     'value']).groupBy([
>         'group1',
>         'group2'
>     ]).agg(
>         F.mean('value').alias('mean_value')
>     ).orderBy('group1', 'group2')
> 
> cond = [df.group1 == means.group1, df.group2 == means.group2]
> 
> means.show()
> df = df.select([
>     'group1',
>     'group2',
>     'name',
>     'value']).join(
>         means,
>         cond
>     ).drop(
>         df.group1
>     ).drop(
>         df.group2
>     ).select('group1',
>              'group2',
>              'name',
>              'value',
>              'mean_value')
> 
> final = df.withColumn(
>     'diff',
>     F.abs(df.value - df.mean_value))
> final.show()
> 
> sc.stop()
> 
> And here is an example dataset I'm playing with:
> 
> 100,name1,0.43,0
> 100,name2,0.33,0
> 100,name3,0.73,0
> 101,name1,0.29,0
> 101,name2,0.96,0
> 101,name3,0.42,0
> 102,name1,0.01,0
> 102,name2,0.42,0
> 102,name3,0.51,0
> 103,name1,0.55,0
> 103,name2,0.45,0
> 103,name3,0.02,0
> 104,name1,0.93,0
> 104,name2,0.16,0
> 104,name3,0.74,0
> 105,name1,0.41,0
> 105,name2,0.65,0
> 105,name3,0.29,0
> 100,name1,0.51,1
> 100,name2,0.51,1
> 100,name3,0.43,1
> 101,name1,0.59,1
> 101,name2,0.55,1
> 101,name3,0.84,1
> 102,name1,0.01,1
> 102,name2,0.98,1
> 102,name3,0.44,1
> 103,name1,0.47,1
> 103,name2,0.16,1
> 103,name3,0.02,1
> 104,name1,0.83,1
> 104,name2,0.89,1
> 104,name3,0.31,1
> 105,name1,0.59,1
> 105,name2,0.77,1
> 105,name3,0.45,1
> 
> and here is what I'm trying to produce:
> 
> group1,group2,name,value,median,diff
> 0,100,name1,0.43,0.43,0.0
> 0,100,name2,0.33,0.43,0.10
> 0,100,name3,0.73,0.43,0.30
> 0,101,name1,0.29,0.42,0.13
> 0,101,name2,0.96,0.42,0.54
> 0,101,name3,0.42,0.42,0.0
> 0,102,name1,0.01,0.42,0.41
> 0,102,name2,0.42,0.42,0.0
> 0,102,name3,0.51,0.42,0.09
> 0,103,name1,0.55,0.45,0.10
> 0,103,name2,0.45,0.45,0.0
> 0,103,name3,0.02,0.45,0.43
> 0,104,name1,0.93,0.74,0.19
> 0,104,name2,0.16,0.74,0.58
> 0,104,name3,0.74,0.74,0.0
> 0,105,name1,0.41,0.41,0.0
> 0,105,name2,0.65,0.41,0.24
> 0,105,name3,0.29,0.41,0.24
> 1,100,name1,0.51,0.51,0.0
> 1,100,name2,0.51,0.51,0.0
> 1,100,name3,0.43,0.51,0.08
> 1,101,name1,0.59,0.59,0.0
> 1,101,name2,0.55,0.59,0.04
> 1,101,name3,0.84,0.59,0.25
> 1,102,name1,0.01,0.44,0.43
> 1,102,name2,0.98,0.44,0.54
> 1,102,name3,0.44,0.44,0.0
> 1,103,name1,0.47,0.16,0.31
> 1,103,name2,0.16,0.16,0.0
> 1,103,name3,0.02,0.16,0.14
> 1,104,name1,0.83,0.83,0.0
> 1,104,name2,0.89,0.83,0.06
> 1,104,name3,0.31,0.83,0.52
> 1,105,name1,0.59,0.59,0.0
> 1,105,name2,0.77,0.59,0.18
> 1,105,name3,0.45,0.59,0.14
> 
> Thanks for any help!
> 
> Cheers,
> Craig

Re: calculate diff of value and median in a group

Posted by Yong Zhang <ja...@hotmail.com>.
Are the elements count big per group? If not, you can group them and use the code to calculate the median and diff.


Yong

________________________________
From: Craig Ching <cr...@gmail.com>
Sent: Wednesday, March 22, 2017 3:17 PM
To: user@spark.apache.org
Subject: calculate diff of value and median in a group

Hi,

When using pyspark, I'd like to be able to calculate the difference between grouped values and their median for the group.  Is this possible?  Here is some code I hacked up that does what I want except that it calculates the grouped diff from mean.  Also, please feel free to comment on how I could make this better if you feel like being helpful :)

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StringType,
    LongType,
    DoubleType,
    StructField,
    StructType
)
from pyspark.sql import functions as F


sc = SparkContext(appName='myapp')
spark = SparkSession(sc)

file_name = 'data.csv'

fields = [
    StructField(
        'group2',
        LongType(),
        True),
    StructField(
        'name',
        StringType(),
        True),
    StructField(
        'value',
        DoubleType(),
        True),
    StructField(
        'group1',
        LongType(),
        True)
]
schema = StructType(fields)

df = spark.read.csv(
    file_name, header=False, mode="DROPMALFORMED", schema=schema
)
df.show()
means = df.select([
    'group1',
    'group2',
    'name',
    'value']).groupBy([
        'group1',
        'group2'
    ]).agg(
        F.mean('value').alias('mean_value')
    ).orderBy('group1', 'group2')

cond = [df.group1 == means.group1, df.group2 == means.group2]

means.show()
df = df.select([
    'group1',
    'group2',
    'name',
    'value']).join(
        means,
        cond
    ).drop(
        df.group1
    ).drop(
        df.group2
    ).select('group1',
             'group2',
             'name',
             'value',
             'mean_value')

final = df.withColumn(
    'diff',
    F.abs(df.value - df.mean_value))
final.show()

sc.stop()

And here is an example dataset I'm playing with:

100,name1,0.43,0
100,name2,0.33,0
100,name3,0.73,0
101,name1,0.29,0
101,name2,0.96,0
101,name3,0.42,0
102,name1,0.01,0
102,name2,0.42,0
102,name3,0.51,0
103,name1,0.55,0
103,name2,0.45,0
103,name3,0.02,0
104,name1,0.93,0
104,name2,0.16,0
104,name3,0.74,0
105,name1,0.41,0
105,name2,0.65,0
105,name3,0.29,0
100,name1,0.51,1
100,name2,0.51,1
100,name3,0.43,1
101,name1,0.59,1
101,name2,0.55,1
101,name3,0.84,1
102,name1,0.01,1
102,name2,0.98,1
102,name3,0.44,1
103,name1,0.47,1
103,name2,0.16,1
103,name3,0.02,1
104,name1,0.83,1
104,name2,0.89,1
104,name3,0.31,1
105,name1,0.59,1
105,name2,0.77,1
105,name3,0.45,1

and here is what I'm trying to produce:

group1,group2,name,value,median,diff
0,100,name1,0.43,0.43,0.0
0,100,name2,0.33,0.43,0.10
0,100,name3,0.73,0.43,0.30
0,101,name1,0.29,0.42,0.13
0,101,name2,0.96,0.42,0.54
0,101,name3,0.42,0.42,0.0
0,102,name1,0.01,0.42,0.41
0,102,name2,0.42,0.42,0.0
0,102,name3,0.51,0.42,0.09
0,103,name1,0.55,0.45,0.10
0,103,name2,0.45,0.45,0.0
0,103,name3,0.02,0.45,0.43
0,104,name1,0.93,0.74,0.19
0,104,name2,0.16,0.74,0.58
0,104,name3,0.74,0.74,0.0
0,105,name1,0.41,0.41,0.0
0,105,name2,0.65,0.41,0.24
0,105,name3,0.29,0.41,0.24
1,100,name1,0.51,0.51,0.0
1,100,name2,0.51,0.51,0.0
1,100,name3,0.43,0.51,0.08
1,101,name1,0.59,0.59,0.0
1,101,name2,0.55,0.59,0.04
1,101,name3,0.84,0.59,0.25
1,102,name1,0.01,0.44,0.43
1,102,name2,0.98,0.44,0.54
1,102,name3,0.44,0.44,0.0
1,103,name1,0.47,0.16,0.31
1,103,name2,0.16,0.16,0.0
1,103,name3,0.02,0.16,0.14
1,104,name1,0.83,0.83,0.0
1,104,name2,0.89,0.83,0.06
1,104,name3,0.31,0.83,0.52
1,105,name1,0.59,0.59,0.0
1,105,name2,0.77,0.59,0.18
1,105,name3,0.45,0.59,0.14

Thanks for any help!

Cheers,
Craig