You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by roni <ro...@gmail.com> on 2017/07/14 21:53:14 UTC

Re: calculate diff of value and median in a group

I was using this function percentile_approx  on 100GB of compressed data
and it just hangs there. Any pointers?

On Wed, Mar 22, 2017 at 6:09 PM, ayan guha <gu...@gmail.com> wrote:

> For median, use percentile_approx with 0.5 (50th percentile is the median)
>
> On Thu, Mar 23, 2017 at 11:01 AM, Yong Zhang <ja...@hotmail.com> wrote:
>
>> He is looking for median, not mean/avg.
>>
>>
>> You have to implement the median logic by yourself, as there is no
>> directly implementation from Spark. You can use RDD API, if you are using
>> 1.6.x, or dataset if 2.x
>>
>>
>> The following example gives you an idea how to calculate the median using
>> dataset API. You can even change the code to add additional logic to
>> calculate the diff of every value with the median.
>>
>>
>> scala> spark.version
>> res31: String = 2.1.0
>>
>> scala> val ds = Seq((100,0.43),(100,0.33),(100,0.73),(101,0.29),(101,0.96),
>> (101,0.42),(101,0.01)).toDF("id","value").as[(Int, Double)]
>> ds: org.apache.spark.sql.Dataset[(Int, Double)] = [id: int, value: double]
>>
>> scala> ds.show
>> +---+-----+
>> | id|value|
>> +---+-----+
>> |100| 0.43|
>> |100| 0.33|
>> |100| 0.73|
>> |101| 0.29|
>> |101| 0.96|
>> |101| 0.42|
>> |101| 0.01|
>> +---+-----+
>>
>> scala> def median(seq: Seq[Double]) = {
>>      |   val size = seq.size
>>      |   val sorted = seq.sorted
>>      |   size match {
>>      |     case even if size % 2 == 0 => (sorted((size-2)/2) + sorted(size/2)) / 2
>>      |     case odd => sorted((size-1)/2)
>>      |   }
>>      | }
>> median: (seq: Seq[Double])Double
>>
>> scala> ds.groupByKey(_._1).mapGroups((id, iter) => (id, median(iter.map(_._2).toSeq))).show
>> +---+-----+
>> | _1|   _2|
>> +---+-----+
>> |101|0.355|
>> |100| 0.43|
>> +---+-----+
>>
>>
>> Yong
>>
>>
>>
>>
>> ------------------------------
>> *From:* ayan guha <gu...@gmail.com>
>> *Sent:* Wednesday, March 22, 2017 7:23 PM
>> *To:* Craig Ching
>> *Cc:* Yong Zhang; user@spark.apache.org
>> *Subject:* Re: calculate diff of value and median in a group
>>
>> I would suggest use window function with partitioning.
>>
>> select group1,group2,name,value, avg(value) over (partition group1,group2
>> order by name) m
>> from t
>>
>> On Thu, Mar 23, 2017 at 9:58 AM, Craig Ching <cr...@gmail.com>
>> wrote:
>>
>>> Are the elements count big per group? If not, you can group them and use
>>> the code to calculate the median and diff.
>>>
>>>
>>> They're not big, no.  Any pointers on how I might do that?  The part I'm
>>> having trouble with is the grouping, I can't seem to see how to do the
>>> median per group.  For mean, we have the agg feature, but not for median
>>> (and I understand the reasons for that).
>>>
>>> Yong
>>>
>>> ------------------------------
>>> *From:* Craig Ching <cr...@gmail.com>
>>> *Sent:* Wednesday, March 22, 2017 3:17 PM
>>> *To:* user@spark.apache.org
>>> *Subject:* calculate diff of value and median in a group
>>>
>>> Hi,
>>>
>>> When using pyspark, I'd like to be able to calculate the difference
>>> between grouped values and their median for the group.  Is this possible?
>>> Here is some code I hacked up that does what I want except that it
>>> calculates the grouped diff from mean.  Also, please feel free to comment
>>> on how I could make this better if you feel like being helpful :)
>>>
>>> from pyspark import SparkContext
>>> from pyspark.sql import SparkSession
>>> from pyspark.sql.types import (
>>>     StringType,
>>>     LongType,
>>>     DoubleType,
>>>     StructField,
>>>     StructType
>>> )
>>> from pyspark.sql import functions as F
>>>
>>>
>>> sc = SparkContext(appName='myapp')
>>> spark = SparkSession(sc)
>>>
>>> file_name = 'data.csv'
>>>
>>> fields = [
>>>     StructField(
>>>         'group2',
>>>         LongType(),
>>>         True),
>>>     StructField(
>>>         'name',
>>>         StringType(),
>>>         True),
>>>     StructField(
>>>         'value',
>>>         DoubleType(),
>>>         True),
>>>     StructField(
>>>         'group1',
>>>         LongType(),
>>>         True)
>>> ]
>>> schema = StructType(fields)
>>>
>>> df = spark.read.csv(
>>>     file_name, header=False, mode="DROPMALFORMED", schema=schema
>>> )
>>> df.show()
>>> means = df.select([
>>>     'group1',
>>>     'group2',
>>>     'name',
>>>     'value']).groupBy([
>>>         'group1',
>>>         'group2'
>>>     ]).agg(
>>>         F.mean('value').alias('mean_value')
>>>     ).orderBy('group1', 'group2')
>>>
>>> cond = [df.group1 == means.group1, df.group2 == means.group2]
>>>
>>> means.show()
>>> df = df.select([
>>>     'group1',
>>>     'group2',
>>>     'name',
>>>     'value']).join(
>>>         means,
>>>         cond
>>>     ).drop(
>>>         df.group1
>>>     ).drop(
>>>         df.group2
>>>     ).select('group1',
>>>              'group2',
>>>              'name',
>>>              'value',
>>>              'mean_value')
>>>
>>> final = df.withColumn(
>>>     'diff',
>>>     F.abs(df.value - df.mean_value))
>>> final.show()
>>>
>>> sc.stop()
>>>
>>> And here is an example dataset I'm playing with:
>>>
>>> 100,name1,0.43,0
>>> 100,name2,0.33,0
>>> 100,name3,0.73,0
>>> 101,name1,0.29,0
>>> 101,name2,0.96,0
>>> 101,name3,0.42,0
>>> 102,name1,0.01,0
>>> 102,name2,0.42,0
>>> 102,name3,0.51,0
>>> 103,name1,0.55,0
>>> 103,name2,0.45,0
>>> 103,name3,0.02,0
>>> 104,name1,0.93,0
>>> 104,name2,0.16,0
>>> 104,name3,0.74,0
>>> 105,name1,0.41,0
>>> 105,name2,0.65,0
>>> 105,name3,0.29,0
>>> 100,name1,0.51,1
>>> 100,name2,0.51,1
>>> 100,name3,0.43,1
>>> 101,name1,0.59,1
>>> 101,name2,0.55,1
>>> 101,name3,0.84,1
>>> 102,name1,0.01,1
>>> 102,name2,0.98,1
>>> 102,name3,0.44,1
>>> 103,name1,0.47,1
>>> 103,name2,0.16,1
>>> 103,name3,0.02,1
>>> 104,name1,0.83,1
>>> 104,name2,0.89,1
>>> 104,name3,0.31,1
>>> 105,name1,0.59,1
>>> 105,name2,0.77,1
>>> 105,name3,0.45,1
>>>
>>> and here is what I'm trying to produce:
>>>
>>> group1,group2,name,value,median,diff
>>> 0,100,name1,0.43,0.43,0.0
>>> 0,100,name2,0.33,0.43,0.10
>>> 0,100,name3,0.73,0.43,0.30
>>> 0,101,name1,0.29,0.42,0.13
>>> 0,101,name2,0.96,0.42,0.54
>>> 0,101,name3,0.42,0.42,0.0
>>> 0,102,name1,0.01,0.42,0.41
>>> 0,102,name2,0.42,0.42,0.0
>>> 0,102,name3,0.51,0.42,0.09
>>> 0,103,name1,0.55,0.45,0.10
>>> 0,103,name2,0.45,0.45,0.0
>>> 0,103,name3,0.02,0.45,0.43
>>> 0,104,name1,0.93,0.74,0.19
>>> 0,104,name2,0.16,0.74,0.58
>>> 0,104,name3,0.74,0.74,0.0
>>> 0,105,name1,0.41,0.41,0.0
>>> 0,105,name2,0.65,0.41,0.24
>>> 0,105,name3,0.29,0.41,0.24
>>> 1,100,name1,0.51,0.51,0.0
>>> 1,100,name2,0.51,0.51,0.0
>>> 1,100,name3,0.43,0.51,0.08
>>> 1,101,name1,0.59,0.59,0.0
>>> 1,101,name2,0.55,0.59,0.04
>>> 1,101,name3,0.84,0.59,0.25
>>> 1,102,name1,0.01,0.44,0.43
>>> 1,102,name2,0.98,0.44,0.54
>>> 1,102,name3,0.44,0.44,0.0
>>> 1,103,name1,0.47,0.16,0.31
>>> 1,103,name2,0.16,0.16,0.0
>>> 1,103,name3,0.02,0.16,0.14
>>> 1,104,name1,0.83,0.83,0.0
>>> 1,104,name2,0.89,0.83,0.06
>>> 1,104,name3,0.31,0.83,0.52
>>> 1,105,name1,0.59,0.59,0.0
>>> 1,105,name2,0.77,0.59,0.18
>>> 1,105,name3,0.45,0.59,0.14
>>>
>>> Thanks for any help!
>>>
>>> Cheers,
>>> Craig
>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>
>
> --
> Best Regards,
> Ayan Guha
>