You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Punit Naik <na...@gmail.com> on 2016/06/28 17:31:04 UTC

Modify the functioning of zipWithIndex function for RDDs

Hi

I wanted to change the functioning of the "zipWithIndex" function for spark
RDDs in which the output of the function is, just for an example,  "(data,
prev_index+data.length)" instead of "(data,prev_index+1)".

How can I do this?

-- 
Thank You

Regards

Punit Naik

Re: Modify the functioning of zipWithIndex function for RDDs

Posted by Punit Naik <na...@gmail.com>.
Actually I was writing a code for the Connected Components algorithm. In
that I have to keep track of a variable called vertex number which keeps on
getting incremented based on the number of triples it encounters in a line.
This variable should be available at all the nodes and all the partitions.
The way I want to keep track of it is by incorporating it in the index of
every line. By default, the number of triples are two in a line. But in
some cases there maybe three triples also. So based on the number of
triples a line has, I want to increment its index by that number and the
next line should take the index of the previous line and increment it by
the number of triples it has.

For example:

<asdasd> <asddsa> asdas asdas,0

<asdasd> <dsadsd> <asdasd> asdasd,1

In this case the final aggregated vertex number should be 5 as there are 2
triples in the first line and 3 triples in the second.

Considering the default case, the index numbers of the first and second
line should be 2 and 4 respectively. But because there is an extra triple
in the second line in its third field, the index number of it should be 5
and not 4. There is no pattern in the occurrence of the extra triple in a
line which makes it hard to keep track of the vertex number. So the
modified zipWithIndex function that I want to write should give me the
following output:

<asdasd> <asddsa> asdas asdas,2

<asdasd> <dsadsd> <asdasd> asdasd,5

I hope I clearly explained myself. I am not so sure if this is the proper
approach. Maybe you could suggest me a better approach if there is any.
On 29-Jun-2016 6:31 AM, "Ted Yu" <yu...@gmail.com> wrote:

> Since the data.length is variable, I am not sure whether mixing data.length
> and the index makes sense.
>
> Can you describe your use case in bit more detail ?
>
> Thanks
>
> On Tue, Jun 28, 2016 at 11:34 AM, Punit Naik <na...@gmail.com>
> wrote:
>
>> Hi Ted
>>
>> So would the tuple look like: (x._1, split.startIndex + x._2 +
>> x._1.length) ?
>>
>> On Tue, Jun 28, 2016 at 11:09 PM, Ted Yu <yu...@gmail.com> wrote:
>>
>>> Please take a look at:
>>> core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
>>>
>>> In compute() method:
>>>     val split = splitIn.asInstanceOf[ZippedWithIndexRDDPartition]
>>>     firstParent[T].iterator(split.prev, context).zipWithIndex.map { x =>
>>>       (x._1, split.startIndex + x._2)
>>>
>>> You can modify the second component of the tuple to take data.length
>>> into account.
>>>
>>> On Tue, Jun 28, 2016 at 10:31 AM, Punit Naik <na...@gmail.com>
>>> wrote:
>>>
>>>> Hi
>>>>
>>>> I wanted to change the functioning of the "zipWithIndex" function for
>>>> spark RDDs in which the output of the function is, just for an example,
>>>>  "(data, prev_index+data.length)" instead of "(data,prev_index+1)".
>>>>
>>>> How can I do this?
>>>>
>>>> --
>>>> Thank You
>>>>
>>>> Regards
>>>>
>>>> Punit Naik
>>>>
>>>
>>>
>>
>>
>> --
>> Thank You
>>
>> Regards
>>
>> Punit Naik
>>
>
>

Re: Modify the functioning of zipWithIndex function for RDDs

Posted by Ted Yu <yu...@gmail.com>.
Since the data.length is variable, I am not sure whether mixing data.length
and the index makes sense.

Can you describe your use case in bit more detail ?

Thanks

On Tue, Jun 28, 2016 at 11:34 AM, Punit Naik <na...@gmail.com> wrote:

> Hi Ted
>
> So would the tuple look like: (x._1, split.startIndex + x._2 +
> x._1.length) ?
>
> On Tue, Jun 28, 2016 at 11:09 PM, Ted Yu <yu...@gmail.com> wrote:
>
>> Please take a look at:
>> core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
>>
>> In compute() method:
>>     val split = splitIn.asInstanceOf[ZippedWithIndexRDDPartition]
>>     firstParent[T].iterator(split.prev, context).zipWithIndex.map { x =>
>>       (x._1, split.startIndex + x._2)
>>
>> You can modify the second component of the tuple to take data.length
>> into account.
>>
>> On Tue, Jun 28, 2016 at 10:31 AM, Punit Naik <na...@gmail.com>
>> wrote:
>>
>>> Hi
>>>
>>> I wanted to change the functioning of the "zipWithIndex" function for
>>> spark RDDs in which the output of the function is, just for an example,
>>>  "(data, prev_index+data.length)" instead of "(data,prev_index+1)".
>>>
>>> How can I do this?
>>>
>>> --
>>> Thank You
>>>
>>> Regards
>>>
>>> Punit Naik
>>>
>>
>>
>
>
> --
> Thank You
>
> Regards
>
> Punit Naik
>

Re: Modify the functioning of zipWithIndex function for RDDs

Posted by Punit Naik <na...@gmail.com>.
Hi Ted

So would the tuple look like: (x._1, split.startIndex + x._2 + x._1.length)
?

On Tue, Jun 28, 2016 at 11:09 PM, Ted Yu <yu...@gmail.com> wrote:

> Please take a look at:
> core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
>
> In compute() method:
>     val split = splitIn.asInstanceOf[ZippedWithIndexRDDPartition]
>     firstParent[T].iterator(split.prev, context).zipWithIndex.map { x =>
>       (x._1, split.startIndex + x._2)
>
> You can modify the second component of the tuple to take data.length into
> account.
>
> On Tue, Jun 28, 2016 at 10:31 AM, Punit Naik <na...@gmail.com>
> wrote:
>
>> Hi
>>
>> I wanted to change the functioning of the "zipWithIndex" function for
>> spark RDDs in which the output of the function is, just for an example,
>>  "(data, prev_index+data.length)" instead of "(data,prev_index+1)".
>>
>> How can I do this?
>>
>> --
>> Thank You
>>
>> Regards
>>
>> Punit Naik
>>
>
>


-- 
Thank You

Regards

Punit Naik

Re: Modify the functioning of zipWithIndex function for RDDs

Posted by Ted Yu <yu...@gmail.com>.
Please take a look at:
core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala

In compute() method:
    val split = splitIn.asInstanceOf[ZippedWithIndexRDDPartition]
    firstParent[T].iterator(split.prev, context).zipWithIndex.map { x =>
      (x._1, split.startIndex + x._2)

You can modify the second component of the tuple to take data.length into
account.

On Tue, Jun 28, 2016 at 10:31 AM, Punit Naik <na...@gmail.com> wrote:

> Hi
>
> I wanted to change the functioning of the "zipWithIndex" function for
> spark RDDs in which the output of the function is, just for an example,
>  "(data, prev_index+data.length)" instead of "(data,prev_index+1)".
>
> How can I do this?
>
> --
> Thank You
>
> Regards
>
> Punit Naik
>