You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Vinti Maheshwari <vi...@gmail.com> on 2016/02/21 18:05:10 UTC

Stream group by

Hello,

I have input lines like below

*Input*
t1, file1, 1, 1, 1
t1, file1, 1, 2, 3
t1, file2, 2, 2, 2, 2
t2, file1, 5, 5, 5
t2, file2, 1, 1, 2, 2

and i want to achieve the output like below rows which is a vertical
addition of the corresponding numbers.

*Output*
“file1” : [ 1+1+5, 1+2+5, 1+3+5 ]
“file2” : [ 2+1, 2+1, 2+2, 2+2 ]

I am in a spark streaming context and i am having a hard time trying to
figure out the way to group by file name.

It seems like i will need to use something like below, i am not sure how to
get to the correct syntax. Any inputs will be helpful.

myDStream.foreachRDD(rdd => rdd.groupBy())

I know how to do the vertical sum of array of given numbers, but i am not
sure how to feed that function to the group by.

  def compute_counters(counts : ArrayBuffer[List[Int]]) = {
      counts.toList.transpose.map(_.sum)
  }

~Thanks,
Vinti

Re: Stream group by

Posted by Vinti Maheshwari <vi...@gmail.com>.
>
> Yeah, i tried with reduceByKey, was able to do it. Thanks Ayan and Jatin.
> For reference, final solution:
>
> def main(args: Array[String]): Unit = {
>     val conf = new SparkConf().setAppName("HBaseStream")
>     val sc = new SparkContext(conf)
>     // create a StreamingContext, the main entry point for all streaming functionality
>     val ssc = new StreamingContext(sc, Seconds(2))
>     val inputStream = ssc.socketTextStream("hostname", 9999)
>     val parsedDstream = inputStream
>       .map(line => {
>         val splitLines = line.split(",")
>         (splitLines(1), splitLines.slice(2, splitLines.length).map(_.trim.toInt))
>       })
>       .reduceByKey((first, second) => {
>         val listOfArrays = ArrayBuffer(first, second)
>         listOfArrays.toList.transpose.map(_.sum).toArray
>       })
>       .foreachRDD(rdd => rdd.foreach(Blaher.blah))
>
> }
>
>
> Regards,
> Vinti
>
> On Sun, Feb 21, 2016 at 2:22 PM, ayan guha <gu...@gmail.com> wrote:
>
>> I believe the best way would be to use reduceByKey operation.
>>
>> On Mon, Feb 22, 2016 at 5:10 AM, Jatin Kumar <
>> jkumar@rocketfuelinc.com.invalid> wrote:
>>
>>> You will need to do a collect and update a global map if you want to.
>>>
>>> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>>>          .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 +
>>> r2._3))
>>>          .foreachRDD(rdd => {
>>>            rdd.collect().foreach((fileName, valueTuple) => <update
>>> global map here>)
>>>          })
>>>
>>> --
>>> Thanks
>>> Jatin Kumar | Rocket Scientist
>>> +91-7696741743 m
>>>
>>> On Sun, Feb 21, 2016 at 11:30 PM, Vinti Maheshwari <vinti.uiet@gmail.com
>>> > wrote:
>>>
>>>> Nevermind, seems like an executor level mutable map is not recommended
>>>> as stated in
>>>> http://blog.guillaume-pitel.fr/2015/06/spark-trick-shot-node-centered-aggregator/
>>>>
>>>> On Sun, Feb 21, 2016 at 9:54 AM, Vinti Maheshwari <vinti.uiet@gmail.com
>>>> > wrote:
>>>>
>>>>> Thanks for your reply Jatin. I changed my parsing logic to what you
>>>>> suggested:
>>>>>
>>>>>     def parseCoverageLine(str: String) = {
>>>>>       val arr = str.split(",")
>>>>>       ...
>>>>>       ...
>>>>>       (arr(0), arr(1) :: count.toList)  // (test, [file, 1, 1, 2])
>>>>>     }
>>>>>
>>>>> Then in the grouping, can i use a global hash map per executor /
>>>>> partition to aggregate the results?
>>>>>
>>>>> val globalMap:[String: List[Int]] = Map()
>>>>> val coverageDStream = inputStream.map(parseCoverageLine)
>>>>>     coverageDStream.reduceByKey((fileAndCountsA, fileAndCountsB) => {
>>>>>     // if exists in global map, append result else add new key
>>>>>
>>>>>     // globalMap
>>>>>     // { "file1": [1+1+5, 1+2+5, 1+3+5], "file2": [ 2, 2, 2, 2 ] }
>>>>> })
>>>>>
>>>>> Thanks,
>>>>> Vinti
>>>>>
>>>>> On Sun, Feb 21, 2016 at 9:31 AM, Jatin Kumar <jkumar@rocketfuelinc.com
>>>>> > wrote:
>>>>>
>>>>>> Hello Vinti,
>>>>>>
>>>>>> One way to get this done is you split your input line into key and
>>>>>> value tuple and then you can simply use groupByKey and handle the values
>>>>>> the way you want. For example:
>>>>>>
>>>>>> Assuming you have already split the values into a 5 tuple:
>>>>>> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>>>>>>          .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3
>>>>>> + r2._3))
>>>>>>
>>>>>> I hope that helps.
>>>>>>
>>>>>> --
>>>>>> Thanks
>>>>>> Jatin Kumar | Rocket Scientist
>>>>>> +91-7696741743 m
>>>>>>
>>>>>> On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari <
>>>>>> vinti.uiet@gmail.com> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> I have input lines like below
>>>>>>>
>>>>>>> *Input*
>>>>>>> t1, file1, 1, 1, 1
>>>>>>> t1, file1, 1, 2, 3
>>>>>>> t1, file2, 2, 2, 2, 2
>>>>>>> t2, file1, 5, 5, 5
>>>>>>> t2, file2, 1, 1, 2, 2
>>>>>>>
>>>>>>> and i want to achieve the output like below rows which is a vertical
>>>>>>> addition of the corresponding numbers.
>>>>>>>
>>>>>>> *Output*
>>>>>>> “file1” : [ 1+1+5, 1+2+5, 1+3+5 ]
>>>>>>> “file2” : [ 2+1, 2+1, 2+2, 2+2 ]
>>>>>>>
>>>>>>> I am in a spark streaming context and i am having a hard time trying
>>>>>>> to figure out the way to group by file name.
>>>>>>>
>>>>>>> It seems like i will need to use something like below, i am not sure
>>>>>>> how to get to the correct syntax. Any inputs will be helpful.
>>>>>>>
>>>>>>> myDStream.foreachRDD(rdd => rdd.groupBy())
>>>>>>>
>>>>>>> I know how to do the vertical sum of array of given numbers, but i
>>>>>>> am not sure how to feed that function to the group by.
>>>>>>>
>>>>>>>   def compute_counters(counts : ArrayBuffer[List[Int]]) = {
>>>>>>>       counts.toList.transpose.map(_.sum)
>>>>>>>   }
>>>>>>>
>>>>>>> ~Thanks,
>>>>>>> Vinti
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>

Re: Stream group by

Posted by Vinti Maheshwari <vi...@gmail.com>.
Yeah, i tried with reduceByKey, was able to do it. Thanks Ayan and Jatin.
For reference, final solution:

def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("HBaseStream")
    val sc = new SparkContext(conf)
    // create a StreamingContext, the main entry point for all
streaming functionality
    val ssc = new StreamingContext(sc, Seconds(2))
    val inputStream = ssc.socketTextStream("hostname", 9999)
    val parsedDstream = inputStream
      .map(line => {
        val splitLines = line.split(",")
        (splitLines(1), splitLines.slice(2,
splitLines.length).map(_.trim.toInt))
      })
      .reduceByKey((first, second) => {
        val listOfArrays = ArrayBuffer(first, second)
        listOfArrays.toList.transpose.map(_.sum).toArray
      })
      .foreachRDD(rdd => rdd.foreach(Blaher.blah))

}


Regards,
Vinti

On Sun, Feb 21, 2016 at 2:22 PM, ayan guha <gu...@gmail.com> wrote:

> I believe the best way would be to use reduceByKey operation.
>
> On Mon, Feb 22, 2016 at 5:10 AM, Jatin Kumar <
> jkumar@rocketfuelinc.com.invalid> wrote:
>
>> You will need to do a collect and update a global map if you want to.
>>
>> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>>          .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 +
>> r2._3))
>>          .foreachRDD(rdd => {
>>            rdd.collect().foreach((fileName, valueTuple) => <update global
>> map here>)
>>          })
>>
>> --
>> Thanks
>> Jatin Kumar | Rocket Scientist
>> +91-7696741743 m
>>
>> On Sun, Feb 21, 2016 at 11:30 PM, Vinti Maheshwari <vi...@gmail.com>
>> wrote:
>>
>>> Nevermind, seems like an executor level mutable map is not recommended
>>> as stated in
>>> http://blog.guillaume-pitel.fr/2015/06/spark-trick-shot-node-centered-aggregator/
>>>
>>> On Sun, Feb 21, 2016 at 9:54 AM, Vinti Maheshwari <vi...@gmail.com>
>>> wrote:
>>>
>>>> Thanks for your reply Jatin. I changed my parsing logic to what you
>>>> suggested:
>>>>
>>>>     def parseCoverageLine(str: String) = {
>>>>       val arr = str.split(",")
>>>>       ...
>>>>       ...
>>>>       (arr(0), arr(1) :: count.toList)  // (test, [file, 1, 1, 2])
>>>>     }
>>>>
>>>> Then in the grouping, can i use a global hash map per executor /
>>>> partition to aggregate the results?
>>>>
>>>> val globalMap:[String: List[Int]] = Map()
>>>> val coverageDStream = inputStream.map(parseCoverageLine)
>>>>     coverageDStream.reduceByKey((fileAndCountsA, fileAndCountsB) => {
>>>>     // if exists in global map, append result else add new key
>>>>
>>>>     // globalMap
>>>>     // { "file1": [1+1+5, 1+2+5, 1+3+5], "file2": [ 2, 2, 2, 2 ] }
>>>> })
>>>>
>>>> Thanks,
>>>> Vinti
>>>>
>>>> On Sun, Feb 21, 2016 at 9:31 AM, Jatin Kumar <jk...@rocketfuelinc.com>
>>>> wrote:
>>>>
>>>>> Hello Vinti,
>>>>>
>>>>> One way to get this done is you split your input line into key and
>>>>> value tuple and then you can simply use groupByKey and handle the values
>>>>> the way you want. For example:
>>>>>
>>>>> Assuming you have already split the values into a 5 tuple:
>>>>> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>>>>>          .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3
>>>>> + r2._3))
>>>>>
>>>>> I hope that helps.
>>>>>
>>>>> --
>>>>> Thanks
>>>>> Jatin Kumar | Rocket Scientist
>>>>> +91-7696741743 m
>>>>>
>>>>> On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari <
>>>>> vinti.uiet@gmail.com> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I have input lines like below
>>>>>>
>>>>>> *Input*
>>>>>> t1, file1, 1, 1, 1
>>>>>> t1, file1, 1, 2, 3
>>>>>> t1, file2, 2, 2, 2, 2
>>>>>> t2, file1, 5, 5, 5
>>>>>> t2, file2, 1, 1, 2, 2
>>>>>>
>>>>>> and i want to achieve the output like below rows which is a vertical
>>>>>> addition of the corresponding numbers.
>>>>>>
>>>>>> *Output*
>>>>>> “file1” : [ 1+1+5, 1+2+5, 1+3+5 ]
>>>>>> “file2” : [ 2+1, 2+1, 2+2, 2+2 ]
>>>>>>
>>>>>> I am in a spark streaming context and i am having a hard time trying
>>>>>> to figure out the way to group by file name.
>>>>>>
>>>>>> It seems like i will need to use something like below, i am not sure
>>>>>> how to get to the correct syntax. Any inputs will be helpful.
>>>>>>
>>>>>> myDStream.foreachRDD(rdd => rdd.groupBy())
>>>>>>
>>>>>> I know how to do the vertical sum of array of given numbers, but i am
>>>>>> not sure how to feed that function to the group by.
>>>>>>
>>>>>>   def compute_counters(counts : ArrayBuffer[List[Int]]) = {
>>>>>>       counts.toList.transpose.map(_.sum)
>>>>>>   }
>>>>>>
>>>>>> ~Thanks,
>>>>>> Vinti
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>

Re: Stream group by

Posted by ayan guha <gu...@gmail.com>.
I believe the best way would be to use reduceByKey operation.

On Mon, Feb 22, 2016 at 5:10 AM, Jatin Kumar <
jkumar@rocketfuelinc.com.invalid> wrote:

> You will need to do a collect and update a global map if you want to.
>
> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>          .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 +
> r2._3))
>          .foreachRDD(rdd => {
>            rdd.collect().foreach((fileName, valueTuple) => <update global
> map here>)
>          })
>
> --
> Thanks
> Jatin Kumar | Rocket Scientist
> +91-7696741743 m
>
> On Sun, Feb 21, 2016 at 11:30 PM, Vinti Maheshwari <vi...@gmail.com>
> wrote:
>
>> Nevermind, seems like an executor level mutable map is not recommended as
>> stated in
>> http://blog.guillaume-pitel.fr/2015/06/spark-trick-shot-node-centered-aggregator/
>>
>> On Sun, Feb 21, 2016 at 9:54 AM, Vinti Maheshwari <vi...@gmail.com>
>> wrote:
>>
>>> Thanks for your reply Jatin. I changed my parsing logic to what you
>>> suggested:
>>>
>>>     def parseCoverageLine(str: String) = {
>>>       val arr = str.split(",")
>>>       ...
>>>       ...
>>>       (arr(0), arr(1) :: count.toList)  // (test, [file, 1, 1, 2])
>>>     }
>>>
>>> Then in the grouping, can i use a global hash map per executor /
>>> partition to aggregate the results?
>>>
>>> val globalMap:[String: List[Int]] = Map()
>>> val coverageDStream = inputStream.map(parseCoverageLine)
>>>     coverageDStream.reduceByKey((fileAndCountsA, fileAndCountsB) => {
>>>     // if exists in global map, append result else add new key
>>>
>>>     // globalMap
>>>     // { "file1": [1+1+5, 1+2+5, 1+3+5], "file2": [ 2, 2, 2, 2 ] }
>>> })
>>>
>>> Thanks,
>>> Vinti
>>>
>>> On Sun, Feb 21, 2016 at 9:31 AM, Jatin Kumar <jk...@rocketfuelinc.com>
>>> wrote:
>>>
>>>> Hello Vinti,
>>>>
>>>> One way to get this done is you split your input line into key and
>>>> value tuple and then you can simply use groupByKey and handle the values
>>>> the way you want. For example:
>>>>
>>>> Assuming you have already split the values into a 5 tuple:
>>>> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>>>>          .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 +
>>>> r2._3))
>>>>
>>>> I hope that helps.
>>>>
>>>> --
>>>> Thanks
>>>> Jatin Kumar | Rocket Scientist
>>>> +91-7696741743 m
>>>>
>>>> On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari <
>>>> vinti.uiet@gmail.com> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I have input lines like below
>>>>>
>>>>> *Input*
>>>>> t1, file1, 1, 1, 1
>>>>> t1, file1, 1, 2, 3
>>>>> t1, file2, 2, 2, 2, 2
>>>>> t2, file1, 5, 5, 5
>>>>> t2, file2, 1, 1, 2, 2
>>>>>
>>>>> and i want to achieve the output like below rows which is a vertical
>>>>> addition of the corresponding numbers.
>>>>>
>>>>> *Output*
>>>>> “file1” : [ 1+1+5, 1+2+5, 1+3+5 ]
>>>>> “file2” : [ 2+1, 2+1, 2+2, 2+2 ]
>>>>>
>>>>> I am in a spark streaming context and i am having a hard time trying
>>>>> to figure out the way to group by file name.
>>>>>
>>>>> It seems like i will need to use something like below, i am not sure
>>>>> how to get to the correct syntax. Any inputs will be helpful.
>>>>>
>>>>> myDStream.foreachRDD(rdd => rdd.groupBy())
>>>>>
>>>>> I know how to do the vertical sum of array of given numbers, but i am
>>>>> not sure how to feed that function to the group by.
>>>>>
>>>>>   def compute_counters(counts : ArrayBuffer[List[Int]]) = {
>>>>>       counts.toList.transpose.map(_.sum)
>>>>>   }
>>>>>
>>>>> ~Thanks,
>>>>> Vinti
>>>>>
>>>>
>>>>
>>>
>>
>


-- 
Best Regards,
Ayan Guha

Re: Stream group by

Posted by Jatin Kumar <jk...@rocketfuelinc.com.INVALID>.
You will need to do a collect and update a global map if you want to.

myDStream.map(record => (record._2, (record._3, record_4, record._5))
         .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 +
r2._3))
         .foreachRDD(rdd => {
           rdd.collect().foreach((fileName, valueTuple) => <update global
map here>)
         })

--
Thanks
Jatin Kumar | Rocket Scientist
+91-7696741743 m

On Sun, Feb 21, 2016 at 11:30 PM, Vinti Maheshwari <vi...@gmail.com>
wrote:

> Nevermind, seems like an executor level mutable map is not recommended as
> stated in
> http://blog.guillaume-pitel.fr/2015/06/spark-trick-shot-node-centered-aggregator/
>
> On Sun, Feb 21, 2016 at 9:54 AM, Vinti Maheshwari <vi...@gmail.com>
> wrote:
>
>> Thanks for your reply Jatin. I changed my parsing logic to what you
>> suggested:
>>
>>     def parseCoverageLine(str: String) = {
>>       val arr = str.split(",")
>>       ...
>>       ...
>>       (arr(0), arr(1) :: count.toList)  // (test, [file, 1, 1, 2])
>>     }
>>
>> Then in the grouping, can i use a global hash map per executor /
>> partition to aggregate the results?
>>
>> val globalMap:[String: List[Int]] = Map()
>> val coverageDStream = inputStream.map(parseCoverageLine)
>>     coverageDStream.reduceByKey((fileAndCountsA, fileAndCountsB) => {
>>     // if exists in global map, append result else add new key
>>
>>     // globalMap
>>     // { "file1": [1+1+5, 1+2+5, 1+3+5], "file2": [ 2, 2, 2, 2 ] }
>> })
>>
>> Thanks,
>> Vinti
>>
>> On Sun, Feb 21, 2016 at 9:31 AM, Jatin Kumar <jk...@rocketfuelinc.com>
>> wrote:
>>
>>> Hello Vinti,
>>>
>>> One way to get this done is you split your input line into key and value
>>> tuple and then you can simply use groupByKey and handle the values the way
>>> you want. For example:
>>>
>>> Assuming you have already split the values into a 5 tuple:
>>> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>>>          .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 +
>>> r2._3))
>>>
>>> I hope that helps.
>>>
>>> --
>>> Thanks
>>> Jatin Kumar | Rocket Scientist
>>> +91-7696741743 m
>>>
>>> On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari <vinti.uiet@gmail.com
>>> > wrote:
>>>
>>>> Hello,
>>>>
>>>> I have input lines like below
>>>>
>>>> *Input*
>>>> t1, file1, 1, 1, 1
>>>> t1, file1, 1, 2, 3
>>>> t1, file2, 2, 2, 2, 2
>>>> t2, file1, 5, 5, 5
>>>> t2, file2, 1, 1, 2, 2
>>>>
>>>> and i want to achieve the output like below rows which is a vertical
>>>> addition of the corresponding numbers.
>>>>
>>>> *Output*
>>>> “file1” : [ 1+1+5, 1+2+5, 1+3+5 ]
>>>> “file2” : [ 2+1, 2+1, 2+2, 2+2 ]
>>>>
>>>> I am in a spark streaming context and i am having a hard time trying to
>>>> figure out the way to group by file name.
>>>>
>>>> It seems like i will need to use something like below, i am not sure
>>>> how to get to the correct syntax. Any inputs will be helpful.
>>>>
>>>> myDStream.foreachRDD(rdd => rdd.groupBy())
>>>>
>>>> I know how to do the vertical sum of array of given numbers, but i am
>>>> not sure how to feed that function to the group by.
>>>>
>>>>   def compute_counters(counts : ArrayBuffer[List[Int]]) = {
>>>>       counts.toList.transpose.map(_.sum)
>>>>   }
>>>>
>>>> ~Thanks,
>>>> Vinti
>>>>
>>>
>>>
>>
>

Re: Stream group by

Posted by Vinti Maheshwari <vi...@gmail.com>.
Nevermind, seems like an executor level mutable map is not recommended as
stated in
http://blog.guillaume-pitel.fr/2015/06/spark-trick-shot-node-centered-aggregator/

On Sun, Feb 21, 2016 at 9:54 AM, Vinti Maheshwari <vi...@gmail.com>
wrote:

> Thanks for your reply Jatin. I changed my parsing logic to what you
> suggested:
>
>     def parseCoverageLine(str: String) = {
>       val arr = str.split(",")
>       ...
>       ...
>       (arr(0), arr(1) :: count.toList)  // (test, [file, 1, 1, 2])
>     }
>
> Then in the grouping, can i use a global hash map per executor / partition
> to aggregate the results?
>
> val globalMap:[String: List[Int]] = Map()
> val coverageDStream = inputStream.map(parseCoverageLine)
>     coverageDStream.reduceByKey((fileAndCountsA, fileAndCountsB) => {
>     // if exists in global map, append result else add new key
>
>     // globalMap
>     // { "file1": [1+1+5, 1+2+5, 1+3+5], "file2": [ 2, 2, 2, 2 ] }
> })
>
> Thanks,
> Vinti
>
> On Sun, Feb 21, 2016 at 9:31 AM, Jatin Kumar <jk...@rocketfuelinc.com>
> wrote:
>
>> Hello Vinti,
>>
>> One way to get this done is you split your input line into key and value
>> tuple and then you can simply use groupByKey and handle the values the way
>> you want. For example:
>>
>> Assuming you have already split the values into a 5 tuple:
>> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>>          .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 +
>> r2._3))
>>
>> I hope that helps.
>>
>> --
>> Thanks
>> Jatin Kumar | Rocket Scientist
>> +91-7696741743 m
>>
>> On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari <vi...@gmail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> I have input lines like below
>>>
>>> *Input*
>>> t1, file1, 1, 1, 1
>>> t1, file1, 1, 2, 3
>>> t1, file2, 2, 2, 2, 2
>>> t2, file1, 5, 5, 5
>>> t2, file2, 1, 1, 2, 2
>>>
>>> and i want to achieve the output like below rows which is a vertical
>>> addition of the corresponding numbers.
>>>
>>> *Output*
>>> “file1” : [ 1+1+5, 1+2+5, 1+3+5 ]
>>> “file2” : [ 2+1, 2+1, 2+2, 2+2 ]
>>>
>>> I am in a spark streaming context and i am having a hard time trying to
>>> figure out the way to group by file name.
>>>
>>> It seems like i will need to use something like below, i am not sure how
>>> to get to the correct syntax. Any inputs will be helpful.
>>>
>>> myDStream.foreachRDD(rdd => rdd.groupBy())
>>>
>>> I know how to do the vertical sum of array of given numbers, but i am
>>> not sure how to feed that function to the group by.
>>>
>>>   def compute_counters(counts : ArrayBuffer[List[Int]]) = {
>>>       counts.toList.transpose.map(_.sum)
>>>   }
>>>
>>> ~Thanks,
>>> Vinti
>>>
>>
>>
>

Re: Stream group by

Posted by Vinti Maheshwari <vi...@gmail.com>.
Thanks for your reply Jatin. I changed my parsing logic to what you
suggested:

    def parseCoverageLine(str: String) = {
      val arr = str.split(",")
      ...
      ...
      (arr(0), arr(1) :: count.toList)  // (test, [file, 1, 1, 2])
    }

Then in the grouping, can i use a global hash map per executor / partition
to aggregate the results?

val globalMap:[String: List[Int]] = Map()
val coverageDStream = inputStream.map(parseCoverageLine)
    coverageDStream.reduceByKey((fileAndCountsA, fileAndCountsB) => {
    // if exists in global map, append result else add new key

    // globalMap
    // { "file1": [1+1+5, 1+2+5, 1+3+5], "file2": [ 2, 2, 2, 2 ] }
})

Thanks,
Vinti

On Sun, Feb 21, 2016 at 9:31 AM, Jatin Kumar <jk...@rocketfuelinc.com>
wrote:

> Hello Vinti,
>
> One way to get this done is you split your input line into key and value
> tuple and then you can simply use groupByKey and handle the values the way
> you want. For example:
>
> Assuming you have already split the values into a 5 tuple:
> myDStream.map(record => (record._2, (record._3, record_4, record._5))
>          .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 +
> r2._3))
>
> I hope that helps.
>
> --
> Thanks
> Jatin Kumar | Rocket Scientist
> +91-7696741743 m
>
> On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari <vi...@gmail.com>
> wrote:
>
>> Hello,
>>
>> I have input lines like below
>>
>> *Input*
>> t1, file1, 1, 1, 1
>> t1, file1, 1, 2, 3
>> t1, file2, 2, 2, 2, 2
>> t2, file1, 5, 5, 5
>> t2, file2, 1, 1, 2, 2
>>
>> and i want to achieve the output like below rows which is a vertical
>> addition of the corresponding numbers.
>>
>> *Output*
>> “file1” : [ 1+1+5, 1+2+5, 1+3+5 ]
>> “file2” : [ 2+1, 2+1, 2+2, 2+2 ]
>>
>> I am in a spark streaming context and i am having a hard time trying to
>> figure out the way to group by file name.
>>
>> It seems like i will need to use something like below, i am not sure how
>> to get to the correct syntax. Any inputs will be helpful.
>>
>> myDStream.foreachRDD(rdd => rdd.groupBy())
>>
>> I know how to do the vertical sum of array of given numbers, but i am not
>> sure how to feed that function to the group by.
>>
>>   def compute_counters(counts : ArrayBuffer[List[Int]]) = {
>>       counts.toList.transpose.map(_.sum)
>>   }
>>
>> ~Thanks,
>> Vinti
>>
>
>

Re: Stream group by

Posted by Jatin Kumar <jk...@rocketfuelinc.com.INVALID>.
Hello Vinti,

One way to get this done is you split your input line into key and value
tuple and then you can simply use groupByKey and handle the values the way
you want. For example:

Assuming you have already split the values into a 5 tuple:
myDStream.map(record => (record._2, (record._3, record_4, record._5))
         .groupByKey((r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 +
r2._3))

I hope that helps.

--
Thanks
Jatin Kumar | Rocket Scientist
+91-7696741743 m

On Sun, Feb 21, 2016 at 10:35 PM, Vinti Maheshwari <vi...@gmail.com>
wrote:

> Hello,
>
> I have input lines like below
>
> *Input*
> t1, file1, 1, 1, 1
> t1, file1, 1, 2, 3
> t1, file2, 2, 2, 2, 2
> t2, file1, 5, 5, 5
> t2, file2, 1, 1, 2, 2
>
> and i want to achieve the output like below rows which is a vertical
> addition of the corresponding numbers.
>
> *Output*
> “file1” : [ 1+1+5, 1+2+5, 1+3+5 ]
> “file2” : [ 2+1, 2+1, 2+2, 2+2 ]
>
> I am in a spark streaming context and i am having a hard time trying to
> figure out the way to group by file name.
>
> It seems like i will need to use something like below, i am not sure how
> to get to the correct syntax. Any inputs will be helpful.
>
> myDStream.foreachRDD(rdd => rdd.groupBy())
>
> I know how to do the vertical sum of array of given numbers, but i am not
> sure how to feed that function to the group by.
>
>   def compute_counters(counts : ArrayBuffer[List[Int]]) = {
>       counts.toList.transpose.map(_.sum)
>   }
>
> ~Thanks,
> Vinti
>