You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Nathan Kronenfeld <nk...@oculusinfo.com> on 2014/12/12 07:11:43 UTC

Adding a column to a SchemaRDD

Hi, there.

I'm trying to understand how to augment data in a SchemaRDD.

I can see how to do it if can express the added values in SQL - just run
"SELECT *,valueCalculation AS newColumnName FROM table"

I've been searching all over for how to do this if my added value is a
scala function, with no luck.

Let's say I have a SchemaRDD with columns A, B, and C, and I want to add a
new column, D, calculated using Utility.process(b, c), and I want (of
course) to pass in the value B and C from each row, ending up with a new
SchemaRDD with columns A, B, C, and D.

Is this possible? If so, how?

Thanks,
                   -Nathan

-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenfeld@oculusinfo.com

Re: Adding a column to a SchemaRDD

Posted by Nathan Kronenfeld <nk...@oculusinfo.com>.
Perfect, that's exactly what I was looking for.

Thank you!

On Mon, Dec 15, 2014 at 3:32 AM, Yanbo Liang <ya...@gmail.com> wrote:
>
> Hi Nathan,
>
> #1
>
> Spark SQL & DSL can satisfy your requirement. You can refer the following
> code snippet:
>
> jdata.select(Star(Node), 'seven.getField("mod"), 'eleven.getField("mod"))
>
> You need to import org.apache.spark.sql.catalyst.analysis.Star in advance.
>
> #2
>
> After you make the transform above, you do not need to make SchemaRDD
> manually.
> Because that jdata.select() return a SchemaRDD and you can operate on it
> directly.
>
> For example, the following code snippet will return a new SchemaRDD with
> longer Row:
>
> val t1 = jdata.select(Star(Node), 'seven.getField("mod") +
> 'eleven.getField("mod")  as 'mod_sum)
>
> You can use t1.printSchema() to print the schema of this SchemaRDD and
> check whether it satisfy your requirements.
>
>
>
> 2014-12-13 0:00 GMT+08:00 Nathan Kronenfeld <nk...@oculusinfo.com>:
>>
>> (1) I understand about immutability, that's why I said I wanted a new
>> SchemaRDD.
>> (2) I specfically asked for a non-SQL solution that takes a SchemaRDD,
>> and results in a new SchemaRDD with one new function.
>> (3) The DSL stuff is a big clue, but I can't find adequate documentation
>> for it
>>
>> What I'm looking for is something like:
>>
>> import org.apache.spark.sql._
>>
>>
>> val sqlc = new SQLContext(sc)
>> import sqlc._
>>
>>
>> val data = sc.parallelize(0 to 99).map(n =>
>>     ("{\"seven\": {\"mod\": %d, \"times\": %d}, "+
>>       "\"eleven\": {\"mod\": %d, \"times\": %d}}").format(n % 7, n * 7, n
>> % 11, n * 11))
>> val jdata = sqlc.jsonRDD(data)
>> jdata.registerTempTable("jdata")
>>
>>
>> val sqlVersion = sqlc.sql("SELECT *, (seven.mod + eleven.mod) AS modsum
>> FROM jdata")
>>
>>
>> This sqlVersion works fine, but if I try to do the same thing with a
>> programatic function, I'm missing a bunch of pieces:
>>
>>    - I assume I'd need to start with something like:
>>    jdata.select('*, 'seven.mod, 'eleven.mod)
>>    and then get and process the last two elements.  The problems are:
>>       - I can't select '* - there seems no way to get the complete row
>>       - I can't select 'seven.mod or 'eleven.mod - the symbol evaluation
>>       seems only one deep.
>>    - Assuming I could do that, I don't see a way to make the result into
>>    a SchemaRDD.  I assume I would have to do something like:
>>       1. take my row and value, and create a new, slightly longer row
>>       2. take my old schema, and create a new schema with one more field
>>       at the end, named and typed appropriately
>>       3. combine the two into a SchemaRDD
>>       I think I see how to do 3, but 1 and 2 elude me.
>>
>> Is there more complete documentation somewhere for the DSL portion?
>> Anyone have a clue about any of the above?
>>
>>
>>
>> On Fri, Dec 12, 2014 at 6:01 AM, Yanbo Liang <ya...@gmail.com>
>> wrote:
>>
>>> RDD is immutable so you can not modify it.
>>> If you want to modify some value or schema in RDD,  using map to
>>> generate a new RDD.
>>> The following code for your reference:
>>>
>>> def add(a:Int,b:Int):Int = {
>>>   a + b
>>> }
>>>
>>> val d1 = sc.parallelize(1 to 10).map { i => (i, i+1, i+2) }
>>> val d2 = d1.map { i => (i._1, i._2, add(i._1, i._2))}
>>> d2.foreach(println)
>>>
>>>
>>> Otherwise, if your self-defining function is straightforward and you can
>>> represent it by SQL, using Spark SQL or DSL is also a good choice.
>>>
>>> case class Person(id: Int, score: Int, value: Int)
>>>
>>> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>>>
>>> import sqlContext._
>>>
>>> val d1 = sc.parallelize(1 to 10).map { i => Person(i,i+1,i+2)}
>>> val d2 = d1.select('id, 'score, 'id + 'score)
>>> d2.foreach(println)
>>>
>>>
>>> 2014-12-12 14:11 GMT+08:00 Nathan Kronenfeld <nkronenfeld@oculusinfo.com
>>> >:
>>>
>>>> Hi, there.
>>>>
>>>> I'm trying to understand how to augment data in a SchemaRDD.
>>>>
>>>> I can see how to do it if can express the added values in SQL - just
>>>> run "SELECT *,valueCalculation AS newColumnName FROM table"
>>>>
>>>> I've been searching all over for how to do this if my added value is a
>>>> scala function, with no luck.
>>>>
>>>> Let's say I have a SchemaRDD with columns A, B, and C, and I want to
>>>> add a new column, D, calculated using Utility.process(b, c), and I want (of
>>>> course) to pass in the value B and C from each row, ending up with a new
>>>> SchemaRDD with columns A, B, C, and D.
>>>>
>>>> Is this possible? If so, how?
>>>>
>>>> Thanks,
>>>>                    -Nathan
>>>>
>>>> --
>>>> Nathan Kronenfeld
>>>> Senior Visualization Developer
>>>> Oculus Info Inc
>>>> 2 Berkeley Street, Suite 600,
>>>> Toronto, Ontario M5A 4J5
>>>> Phone:  +1-416-203-3003 x 238
>>>> Email:  nkronenfeld@oculusinfo.com
>>>>
>>>
>>>
>>
>>
>> --
>> Nathan Kronenfeld
>> Senior Visualization Developer
>> Oculus Info Inc
>> 2 Berkeley Street, Suite 600,
>> Toronto, Ontario M5A 4J5
>> Phone:  +1-416-203-3003 x 238
>> Email:  nkronenfeld@oculusinfo.com
>>
>

-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenfeld@oculusinfo.com

Re: Adding a column to a SchemaRDD

Posted by Yanbo Liang <ya...@gmail.com>.
Hi Nathan,

#1

Spark SQL & DSL can satisfy your requirement. You can refer the following
code snippet:

jdata.select(Star(Node), 'seven.getField("mod"), 'eleven.getField("mod"))

You need to import org.apache.spark.sql.catalyst.analysis.Star in advance.

#2

After you make the transform above, you do not need to make SchemaRDD
manually.
Because that jdata.select() return a SchemaRDD and you can operate on it
directly.

For example, the following code snippet will return a new SchemaRDD with
longer Row:

val t1 = jdata.select(Star(Node), 'seven.getField("mod") +
'eleven.getField("mod")  as 'mod_sum)

You can use t1.printSchema() to print the schema of this SchemaRDD and
check whether it satisfy your requirements.



2014-12-13 0:00 GMT+08:00 Nathan Kronenfeld <nk...@oculusinfo.com>:
>
> (1) I understand about immutability, that's why I said I wanted a new
> SchemaRDD.
> (2) I specfically asked for a non-SQL solution that takes a SchemaRDD, and
> results in a new SchemaRDD with one new function.
> (3) The DSL stuff is a big clue, but I can't find adequate documentation
> for it
>
> What I'm looking for is something like:
>
> import org.apache.spark.sql._
>
>
> val sqlc = new SQLContext(sc)
> import sqlc._
>
>
> val data = sc.parallelize(0 to 99).map(n =>
>     ("{\"seven\": {\"mod\": %d, \"times\": %d}, "+
>       "\"eleven\": {\"mod\": %d, \"times\": %d}}").format(n % 7, n * 7, n
> % 11, n * 11))
> val jdata = sqlc.jsonRDD(data)
> jdata.registerTempTable("jdata")
>
>
> val sqlVersion = sqlc.sql("SELECT *, (seven.mod + eleven.mod) AS modsum
> FROM jdata")
>
>
> This sqlVersion works fine, but if I try to do the same thing with a
> programatic function, I'm missing a bunch of pieces:
>
>    - I assume I'd need to start with something like:
>    jdata.select('*, 'seven.mod, 'eleven.mod)
>    and then get and process the last two elements.  The problems are:
>       - I can't select '* - there seems no way to get the complete row
>       - I can't select 'seven.mod or 'eleven.mod - the symbol evaluation
>       seems only one deep.
>    - Assuming I could do that, I don't see a way to make the result into
>    a SchemaRDD.  I assume I would have to do something like:
>       1. take my row and value, and create a new, slightly longer row
>       2. take my old schema, and create a new schema with one more field
>       at the end, named and typed appropriately
>       3. combine the two into a SchemaRDD
>       I think I see how to do 3, but 1 and 2 elude me.
>
> Is there more complete documentation somewhere for the DSL portion? Anyone
> have a clue about any of the above?
>
>
>
> On Fri, Dec 12, 2014 at 6:01 AM, Yanbo Liang <ya...@gmail.com> wrote:
>
>> RDD is immutable so you can not modify it.
>> If you want to modify some value or schema in RDD,  using map to generate
>> a new RDD.
>> The following code for your reference:
>>
>> def add(a:Int,b:Int):Int = {
>>   a + b
>> }
>>
>> val d1 = sc.parallelize(1 to 10).map { i => (i, i+1, i+2) }
>> val d2 = d1.map { i => (i._1, i._2, add(i._1, i._2))}
>> d2.foreach(println)
>>
>>
>> Otherwise, if your self-defining function is straightforward and you can
>> represent it by SQL, using Spark SQL or DSL is also a good choice.
>>
>> case class Person(id: Int, score: Int, value: Int)
>>
>> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>>
>> import sqlContext._
>>
>> val d1 = sc.parallelize(1 to 10).map { i => Person(i,i+1,i+2)}
>> val d2 = d1.select('id, 'score, 'id + 'score)
>> d2.foreach(println)
>>
>>
>> 2014-12-12 14:11 GMT+08:00 Nathan Kronenfeld <nk...@oculusinfo.com>
>> :
>>
>>> Hi, there.
>>>
>>> I'm trying to understand how to augment data in a SchemaRDD.
>>>
>>> I can see how to do it if can express the added values in SQL - just run
>>> "SELECT *,valueCalculation AS newColumnName FROM table"
>>>
>>> I've been searching all over for how to do this if my added value is a
>>> scala function, with no luck.
>>>
>>> Let's say I have a SchemaRDD with columns A, B, and C, and I want to add
>>> a new column, D, calculated using Utility.process(b, c), and I want (of
>>> course) to pass in the value B and C from each row, ending up with a new
>>> SchemaRDD with columns A, B, C, and D.
>>>
>>> Is this possible? If so, how?
>>>
>>> Thanks,
>>>                    -Nathan
>>>
>>> --
>>> Nathan Kronenfeld
>>> Senior Visualization Developer
>>> Oculus Info Inc
>>> 2 Berkeley Street, Suite 600,
>>> Toronto, Ontario M5A 4J5
>>> Phone:  +1-416-203-3003 x 238
>>> Email:  nkronenfeld@oculusinfo.com
>>>
>>
>>
>
>
> --
> Nathan Kronenfeld
> Senior Visualization Developer
> Oculus Info Inc
> 2 Berkeley Street, Suite 600,
> Toronto, Ontario M5A 4J5
> Phone:  +1-416-203-3003 x 238
> Email:  nkronenfeld@oculusinfo.com
>

Re: Adding a column to a SchemaRDD

Posted by Nathan Kronenfeld <nk...@oculusinfo.com>.
(1) I understand about immutability, that's why I said I wanted a new
SchemaRDD.
(2) I specfically asked for a non-SQL solution that takes a SchemaRDD, and
results in a new SchemaRDD with one new function.
(3) The DSL stuff is a big clue, but I can't find adequate documentation
for it

What I'm looking for is something like:

import org.apache.spark.sql._


val sqlc = new SQLContext(sc)
import sqlc._


val data = sc.parallelize(0 to 99).map(n =>
    ("{\"seven\": {\"mod\": %d, \"times\": %d}, "+
      "\"eleven\": {\"mod\": %d, \"times\": %d}}").format(n % 7, n * 7, n %
11, n * 11))
val jdata = sqlc.jsonRDD(data)
jdata.registerTempTable("jdata")


val sqlVersion = sqlc.sql("SELECT *, (seven.mod + eleven.mod) AS modsum
FROM jdata")


This sqlVersion works fine, but if I try to do the same thing with a
programatic function, I'm missing a bunch of pieces:

   - I assume I'd need to start with something like:
   jdata.select('*, 'seven.mod, 'eleven.mod)
   and then get and process the last two elements.  The problems are:
      - I can't select '* - there seems no way to get the complete row
      - I can't select 'seven.mod or 'eleven.mod - the symbol evaluation
      seems only one deep.
   - Assuming I could do that, I don't see a way to make the result into a
   SchemaRDD.  I assume I would have to do something like:
      1. take my row and value, and create a new, slightly longer row
      2. take my old schema, and create a new schema with one more field at
      the end, named and typed appropriately
      3. combine the two into a SchemaRDD
      I think I see how to do 3, but 1 and 2 elude me.

Is there more complete documentation somewhere for the DSL portion? Anyone
have a clue about any of the above?



On Fri, Dec 12, 2014 at 6:01 AM, Yanbo Liang <ya...@gmail.com> wrote:

> RDD is immutable so you can not modify it.
> If you want to modify some value or schema in RDD,  using map to generate
> a new RDD.
> The following code for your reference:
>
> def add(a:Int,b:Int):Int = {
>   a + b
> }
>
> val d1 = sc.parallelize(1 to 10).map { i => (i, i+1, i+2) }
> val d2 = d1.map { i => (i._1, i._2, add(i._1, i._2))}
> d2.foreach(println)
>
>
> Otherwise, if your self-defining function is straightforward and you can
> represent it by SQL, using Spark SQL or DSL is also a good choice.
>
> case class Person(id: Int, score: Int, value: Int)
>
> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>
> import sqlContext._
>
> val d1 = sc.parallelize(1 to 10).map { i => Person(i,i+1,i+2)}
> val d2 = d1.select('id, 'score, 'id + 'score)
> d2.foreach(println)
>
>
> 2014-12-12 14:11 GMT+08:00 Nathan Kronenfeld <nk...@oculusinfo.com>:
>
>> Hi, there.
>>
>> I'm trying to understand how to augment data in a SchemaRDD.
>>
>> I can see how to do it if can express the added values in SQL - just run
>> "SELECT *,valueCalculation AS newColumnName FROM table"
>>
>> I've been searching all over for how to do this if my added value is a
>> scala function, with no luck.
>>
>> Let's say I have a SchemaRDD with columns A, B, and C, and I want to add
>> a new column, D, calculated using Utility.process(b, c), and I want (of
>> course) to pass in the value B and C from each row, ending up with a new
>> SchemaRDD with columns A, B, C, and D.
>>
>> Is this possible? If so, how?
>>
>> Thanks,
>>                    -Nathan
>>
>> --
>> Nathan Kronenfeld
>> Senior Visualization Developer
>> Oculus Info Inc
>> 2 Berkeley Street, Suite 600,
>> Toronto, Ontario M5A 4J5
>> Phone:  +1-416-203-3003 x 238
>> Email:  nkronenfeld@oculusinfo.com
>>
>
>


-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenfeld@oculusinfo.com

Re: Adding a column to a SchemaRDD

Posted by Yanbo Liang <ya...@gmail.com>.
RDD is immutable so you can not modify it.
If you want to modify some value or schema in RDD,  using map to generate a
new RDD.
The following code for your reference:

def add(a:Int,b:Int):Int = {
  a + b
}

val d1 = sc.parallelize(1 to 10).map { i => (i, i+1, i+2) }
val d2 = d1.map { i => (i._1, i._2, add(i._1, i._2))}
d2.foreach(println)


Otherwise, if your self-defining function is straightforward and you can
represent it by SQL, using Spark SQL or DSL is also a good choice.

case class Person(id: Int, score: Int, value: Int)

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

import sqlContext._

val d1 = sc.parallelize(1 to 10).map { i => Person(i,i+1,i+2)}
val d2 = d1.select('id, 'score, 'id + 'score)
d2.foreach(println)


2014-12-12 14:11 GMT+08:00 Nathan Kronenfeld <nk...@oculusinfo.com>:

> Hi, there.
>
> I'm trying to understand how to augment data in a SchemaRDD.
>
> I can see how to do it if can express the added values in SQL - just run
> "SELECT *,valueCalculation AS newColumnName FROM table"
>
> I've been searching all over for how to do this if my added value is a
> scala function, with no luck.
>
> Let's say I have a SchemaRDD with columns A, B, and C, and I want to add a
> new column, D, calculated using Utility.process(b, c), and I want (of
> course) to pass in the value B and C from each row, ending up with a new
> SchemaRDD with columns A, B, C, and D.
>
> Is this possible? If so, how?
>
> Thanks,
>                    -Nathan
>
> --
> Nathan Kronenfeld
> Senior Visualization Developer
> Oculus Info Inc
> 2 Berkeley Street, Suite 600,
> Toronto, Ontario M5A 4J5
> Phone:  +1-416-203-3003 x 238
> Email:  nkronenfeld@oculusinfo.com
>

Re: Adding a column to a SchemaRDD

Posted by Tobias Pfeiffer <tg...@preferred.jp>.
Nathan,

On Fri, Dec 12, 2014 at 3:11 PM, Nathan Kronenfeld <
nkronenfeld@oculusinfo.com> wrote:
>
> I can see how to do it if can express the added values in SQL - just run
> "SELECT *,valueCalculation AS newColumnName FROM table"
>
> I've been searching all over for how to do this if my added value is a
> scala function, with no luck.
>
> Let's say I have a SchemaRDD with columns A, B, and C, and I want to add a
> new column, D, calculated using Utility.process(b, c), and I want (of
> course) to pass in the value B and C from each row, ending up with a new
> SchemaRDD with columns A, B, C, and D.
> <nk...@oculusinfo.com>
>

I guess you would have to do two things:
- schemardd.map(row => { extend the row here })
  which will give you a plain RDD[Row] without a schema
- take the schema from the schemardd and extend it manually by the name and
type of the newly added column,
- create a new SchemaRDD from your mapped RDD and the manually extended
schema.

Does that make sense?

Tobias