You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Nathan Kronenfeld <nk...@oculusinfo.com> on 2014/12/12 07:11:43 UTC
Adding a column to a SchemaRDD
Hi, there.
I'm trying to understand how to augment data in a SchemaRDD.
I can see how to do it if can express the added values in SQL - just run
"SELECT *,valueCalculation AS newColumnName FROM table"
I've been searching all over for how to do this if my added value is a
scala function, with no luck.
Let's say I have a SchemaRDD with columns A, B, and C, and I want to add a
new column, D, calculated using Utility.process(b, c), and I want (of
course) to pass in the value B and C from each row, ending up with a new
SchemaRDD with columns A, B, C, and D.
Is this possible? If so, how?
Thanks,
-Nathan
--
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone: +1-416-203-3003 x 238
Email: nkronenfeld@oculusinfo.com
Re: Adding a column to a SchemaRDD
Posted by Nathan Kronenfeld <nk...@oculusinfo.com>.
Perfect, that's exactly what I was looking for.
Thank you!
On Mon, Dec 15, 2014 at 3:32 AM, Yanbo Liang <ya...@gmail.com> wrote:
>
> Hi Nathan,
>
> #1
>
> Spark SQL & DSL can satisfy your requirement. You can refer the following
> code snippet:
>
> jdata.select(Star(Node), 'seven.getField("mod"), 'eleven.getField("mod"))
>
> You need to import org.apache.spark.sql.catalyst.analysis.Star in advance.
>
> #2
>
> After you make the transform above, you do not need to make SchemaRDD
> manually.
> Because that jdata.select() return a SchemaRDD and you can operate on it
> directly.
>
> For example, the following code snippet will return a new SchemaRDD with
> longer Row:
>
> val t1 = jdata.select(Star(Node), 'seven.getField("mod") +
> 'eleven.getField("mod") as 'mod_sum)
>
> You can use t1.printSchema() to print the schema of this SchemaRDD and
> check whether it satisfy your requirements.
>
>
>
> 2014-12-13 0:00 GMT+08:00 Nathan Kronenfeld <nk...@oculusinfo.com>:
>>
>> (1) I understand about immutability, that's why I said I wanted a new
>> SchemaRDD.
>> (2) I specfically asked for a non-SQL solution that takes a SchemaRDD,
>> and results in a new SchemaRDD with one new function.
>> (3) The DSL stuff is a big clue, but I can't find adequate documentation
>> for it
>>
>> What I'm looking for is something like:
>>
>> import org.apache.spark.sql._
>>
>>
>> val sqlc = new SQLContext(sc)
>> import sqlc._
>>
>>
>> val data = sc.parallelize(0 to 99).map(n =>
>> ("{\"seven\": {\"mod\": %d, \"times\": %d}, "+
>> "\"eleven\": {\"mod\": %d, \"times\": %d}}").format(n % 7, n * 7, n
>> % 11, n * 11))
>> val jdata = sqlc.jsonRDD(data)
>> jdata.registerTempTable("jdata")
>>
>>
>> val sqlVersion = sqlc.sql("SELECT *, (seven.mod + eleven.mod) AS modsum
>> FROM jdata")
>>
>>
>> This sqlVersion works fine, but if I try to do the same thing with a
>> programatic function, I'm missing a bunch of pieces:
>>
>> - I assume I'd need to start with something like:
>> jdata.select('*, 'seven.mod, 'eleven.mod)
>> and then get and process the last two elements. The problems are:
>> - I can't select '* - there seems no way to get the complete row
>> - I can't select 'seven.mod or 'eleven.mod - the symbol evaluation
>> seems only one deep.
>> - Assuming I could do that, I don't see a way to make the result into
>> a SchemaRDD. I assume I would have to do something like:
>> 1. take my row and value, and create a new, slightly longer row
>> 2. take my old schema, and create a new schema with one more field
>> at the end, named and typed appropriately
>> 3. combine the two into a SchemaRDD
>> I think I see how to do 3, but 1 and 2 elude me.
>>
>> Is there more complete documentation somewhere for the DSL portion?
>> Anyone have a clue about any of the above?
>>
>>
>>
>> On Fri, Dec 12, 2014 at 6:01 AM, Yanbo Liang <ya...@gmail.com>
>> wrote:
>>
>>> RDD is immutable so you can not modify it.
>>> If you want to modify some value or schema in RDD, using map to
>>> generate a new RDD.
>>> The following code for your reference:
>>>
>>> def add(a:Int,b:Int):Int = {
>>> a + b
>>> }
>>>
>>> val d1 = sc.parallelize(1 to 10).map { i => (i, i+1, i+2) }
>>> val d2 = d1.map { i => (i._1, i._2, add(i._1, i._2))}
>>> d2.foreach(println)
>>>
>>>
>>> Otherwise, if your self-defining function is straightforward and you can
>>> represent it by SQL, using Spark SQL or DSL is also a good choice.
>>>
>>> case class Person(id: Int, score: Int, value: Int)
>>>
>>> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>>>
>>> import sqlContext._
>>>
>>> val d1 = sc.parallelize(1 to 10).map { i => Person(i,i+1,i+2)}
>>> val d2 = d1.select('id, 'score, 'id + 'score)
>>> d2.foreach(println)
>>>
>>>
>>> 2014-12-12 14:11 GMT+08:00 Nathan Kronenfeld <nkronenfeld@oculusinfo.com
>>> >:
>>>
>>>> Hi, there.
>>>>
>>>> I'm trying to understand how to augment data in a SchemaRDD.
>>>>
>>>> I can see how to do it if can express the added values in SQL - just
>>>> run "SELECT *,valueCalculation AS newColumnName FROM table"
>>>>
>>>> I've been searching all over for how to do this if my added value is a
>>>> scala function, with no luck.
>>>>
>>>> Let's say I have a SchemaRDD with columns A, B, and C, and I want to
>>>> add a new column, D, calculated using Utility.process(b, c), and I want (of
>>>> course) to pass in the value B and C from each row, ending up with a new
>>>> SchemaRDD with columns A, B, C, and D.
>>>>
>>>> Is this possible? If so, how?
>>>>
>>>> Thanks,
>>>> -Nathan
>>>>
>>>> --
>>>> Nathan Kronenfeld
>>>> Senior Visualization Developer
>>>> Oculus Info Inc
>>>> 2 Berkeley Street, Suite 600,
>>>> Toronto, Ontario M5A 4J5
>>>> Phone: +1-416-203-3003 x 238
>>>> Email: nkronenfeld@oculusinfo.com
>>>>
>>>
>>>
>>
>>
>> --
>> Nathan Kronenfeld
>> Senior Visualization Developer
>> Oculus Info Inc
>> 2 Berkeley Street, Suite 600,
>> Toronto, Ontario M5A 4J5
>> Phone: +1-416-203-3003 x 238
>> Email: nkronenfeld@oculusinfo.com
>>
>
--
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone: +1-416-203-3003 x 238
Email: nkronenfeld@oculusinfo.com
Re: Adding a column to a SchemaRDD
Posted by Yanbo Liang <ya...@gmail.com>.
Hi Nathan,
#1
Spark SQL & DSL can satisfy your requirement. You can refer the following
code snippet:
jdata.select(Star(Node), 'seven.getField("mod"), 'eleven.getField("mod"))
You need to import org.apache.spark.sql.catalyst.analysis.Star in advance.
#2
After you make the transform above, you do not need to make SchemaRDD
manually.
Because that jdata.select() return a SchemaRDD and you can operate on it
directly.
For example, the following code snippet will return a new SchemaRDD with
longer Row:
val t1 = jdata.select(Star(Node), 'seven.getField("mod") +
'eleven.getField("mod") as 'mod_sum)
You can use t1.printSchema() to print the schema of this SchemaRDD and
check whether it satisfy your requirements.
2014-12-13 0:00 GMT+08:00 Nathan Kronenfeld <nk...@oculusinfo.com>:
>
> (1) I understand about immutability, that's why I said I wanted a new
> SchemaRDD.
> (2) I specfically asked for a non-SQL solution that takes a SchemaRDD, and
> results in a new SchemaRDD with one new function.
> (3) The DSL stuff is a big clue, but I can't find adequate documentation
> for it
>
> What I'm looking for is something like:
>
> import org.apache.spark.sql._
>
>
> val sqlc = new SQLContext(sc)
> import sqlc._
>
>
> val data = sc.parallelize(0 to 99).map(n =>
> ("{\"seven\": {\"mod\": %d, \"times\": %d}, "+
> "\"eleven\": {\"mod\": %d, \"times\": %d}}").format(n % 7, n * 7, n
> % 11, n * 11))
> val jdata = sqlc.jsonRDD(data)
> jdata.registerTempTable("jdata")
>
>
> val sqlVersion = sqlc.sql("SELECT *, (seven.mod + eleven.mod) AS modsum
> FROM jdata")
>
>
> This sqlVersion works fine, but if I try to do the same thing with a
> programatic function, I'm missing a bunch of pieces:
>
> - I assume I'd need to start with something like:
> jdata.select('*, 'seven.mod, 'eleven.mod)
> and then get and process the last two elements. The problems are:
> - I can't select '* - there seems no way to get the complete row
> - I can't select 'seven.mod or 'eleven.mod - the symbol evaluation
> seems only one deep.
> - Assuming I could do that, I don't see a way to make the result into
> a SchemaRDD. I assume I would have to do something like:
> 1. take my row and value, and create a new, slightly longer row
> 2. take my old schema, and create a new schema with one more field
> at the end, named and typed appropriately
> 3. combine the two into a SchemaRDD
> I think I see how to do 3, but 1 and 2 elude me.
>
> Is there more complete documentation somewhere for the DSL portion? Anyone
> have a clue about any of the above?
>
>
>
> On Fri, Dec 12, 2014 at 6:01 AM, Yanbo Liang <ya...@gmail.com> wrote:
>
>> RDD is immutable so you can not modify it.
>> If you want to modify some value or schema in RDD, using map to generate
>> a new RDD.
>> The following code for your reference:
>>
>> def add(a:Int,b:Int):Int = {
>> a + b
>> }
>>
>> val d1 = sc.parallelize(1 to 10).map { i => (i, i+1, i+2) }
>> val d2 = d1.map { i => (i._1, i._2, add(i._1, i._2))}
>> d2.foreach(println)
>>
>>
>> Otherwise, if your self-defining function is straightforward and you can
>> represent it by SQL, using Spark SQL or DSL is also a good choice.
>>
>> case class Person(id: Int, score: Int, value: Int)
>>
>> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>>
>> import sqlContext._
>>
>> val d1 = sc.parallelize(1 to 10).map { i => Person(i,i+1,i+2)}
>> val d2 = d1.select('id, 'score, 'id + 'score)
>> d2.foreach(println)
>>
>>
>> 2014-12-12 14:11 GMT+08:00 Nathan Kronenfeld <nk...@oculusinfo.com>
>> :
>>
>>> Hi, there.
>>>
>>> I'm trying to understand how to augment data in a SchemaRDD.
>>>
>>> I can see how to do it if can express the added values in SQL - just run
>>> "SELECT *,valueCalculation AS newColumnName FROM table"
>>>
>>> I've been searching all over for how to do this if my added value is a
>>> scala function, with no luck.
>>>
>>> Let's say I have a SchemaRDD with columns A, B, and C, and I want to add
>>> a new column, D, calculated using Utility.process(b, c), and I want (of
>>> course) to pass in the value B and C from each row, ending up with a new
>>> SchemaRDD with columns A, B, C, and D.
>>>
>>> Is this possible? If so, how?
>>>
>>> Thanks,
>>> -Nathan
>>>
>>> --
>>> Nathan Kronenfeld
>>> Senior Visualization Developer
>>> Oculus Info Inc
>>> 2 Berkeley Street, Suite 600,
>>> Toronto, Ontario M5A 4J5
>>> Phone: +1-416-203-3003 x 238
>>> Email: nkronenfeld@oculusinfo.com
>>>
>>
>>
>
>
> --
> Nathan Kronenfeld
> Senior Visualization Developer
> Oculus Info Inc
> 2 Berkeley Street, Suite 600,
> Toronto, Ontario M5A 4J5
> Phone: +1-416-203-3003 x 238
> Email: nkronenfeld@oculusinfo.com
>
Re: Adding a column to a SchemaRDD
Posted by Nathan Kronenfeld <nk...@oculusinfo.com>.
(1) I understand about immutability, that's why I said I wanted a new
SchemaRDD.
(2) I specfically asked for a non-SQL solution that takes a SchemaRDD, and
results in a new SchemaRDD with one new function.
(3) The DSL stuff is a big clue, but I can't find adequate documentation
for it
What I'm looking for is something like:
import org.apache.spark.sql._
val sqlc = new SQLContext(sc)
import sqlc._
val data = sc.parallelize(0 to 99).map(n =>
("{\"seven\": {\"mod\": %d, \"times\": %d}, "+
"\"eleven\": {\"mod\": %d, \"times\": %d}}").format(n % 7, n * 7, n %
11, n * 11))
val jdata = sqlc.jsonRDD(data)
jdata.registerTempTable("jdata")
val sqlVersion = sqlc.sql("SELECT *, (seven.mod + eleven.mod) AS modsum
FROM jdata")
This sqlVersion works fine, but if I try to do the same thing with a
programatic function, I'm missing a bunch of pieces:
- I assume I'd need to start with something like:
jdata.select('*, 'seven.mod, 'eleven.mod)
and then get and process the last two elements. The problems are:
- I can't select '* - there seems no way to get the complete row
- I can't select 'seven.mod or 'eleven.mod - the symbol evaluation
seems only one deep.
- Assuming I could do that, I don't see a way to make the result into a
SchemaRDD. I assume I would have to do something like:
1. take my row and value, and create a new, slightly longer row
2. take my old schema, and create a new schema with one more field at
the end, named and typed appropriately
3. combine the two into a SchemaRDD
I think I see how to do 3, but 1 and 2 elude me.
Is there more complete documentation somewhere for the DSL portion? Anyone
have a clue about any of the above?
On Fri, Dec 12, 2014 at 6:01 AM, Yanbo Liang <ya...@gmail.com> wrote:
> RDD is immutable so you can not modify it.
> If you want to modify some value or schema in RDD, using map to generate
> a new RDD.
> The following code for your reference:
>
> def add(a:Int,b:Int):Int = {
> a + b
> }
>
> val d1 = sc.parallelize(1 to 10).map { i => (i, i+1, i+2) }
> val d2 = d1.map { i => (i._1, i._2, add(i._1, i._2))}
> d2.foreach(println)
>
>
> Otherwise, if your self-defining function is straightforward and you can
> represent it by SQL, using Spark SQL or DSL is also a good choice.
>
> case class Person(id: Int, score: Int, value: Int)
>
> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>
> import sqlContext._
>
> val d1 = sc.parallelize(1 to 10).map { i => Person(i,i+1,i+2)}
> val d2 = d1.select('id, 'score, 'id + 'score)
> d2.foreach(println)
>
>
> 2014-12-12 14:11 GMT+08:00 Nathan Kronenfeld <nk...@oculusinfo.com>:
>
>> Hi, there.
>>
>> I'm trying to understand how to augment data in a SchemaRDD.
>>
>> I can see how to do it if can express the added values in SQL - just run
>> "SELECT *,valueCalculation AS newColumnName FROM table"
>>
>> I've been searching all over for how to do this if my added value is a
>> scala function, with no luck.
>>
>> Let's say I have a SchemaRDD with columns A, B, and C, and I want to add
>> a new column, D, calculated using Utility.process(b, c), and I want (of
>> course) to pass in the value B and C from each row, ending up with a new
>> SchemaRDD with columns A, B, C, and D.
>>
>> Is this possible? If so, how?
>>
>> Thanks,
>> -Nathan
>>
>> --
>> Nathan Kronenfeld
>> Senior Visualization Developer
>> Oculus Info Inc
>> 2 Berkeley Street, Suite 600,
>> Toronto, Ontario M5A 4J5
>> Phone: +1-416-203-3003 x 238
>> Email: nkronenfeld@oculusinfo.com
>>
>
>
--
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone: +1-416-203-3003 x 238
Email: nkronenfeld@oculusinfo.com
Re: Adding a column to a SchemaRDD
Posted by Yanbo Liang <ya...@gmail.com>.
RDD is immutable so you can not modify it.
If you want to modify some value or schema in RDD, using map to generate a
new RDD.
The following code for your reference:
def add(a:Int,b:Int):Int = {
a + b
}
val d1 = sc.parallelize(1 to 10).map { i => (i, i+1, i+2) }
val d2 = d1.map { i => (i._1, i._2, add(i._1, i._2))}
d2.foreach(println)
Otherwise, if your self-defining function is straightforward and you can
represent it by SQL, using Spark SQL or DSL is also a good choice.
case class Person(id: Int, score: Int, value: Int)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
val d1 = sc.parallelize(1 to 10).map { i => Person(i,i+1,i+2)}
val d2 = d1.select('id, 'score, 'id + 'score)
d2.foreach(println)
2014-12-12 14:11 GMT+08:00 Nathan Kronenfeld <nk...@oculusinfo.com>:
> Hi, there.
>
> I'm trying to understand how to augment data in a SchemaRDD.
>
> I can see how to do it if can express the added values in SQL - just run
> "SELECT *,valueCalculation AS newColumnName FROM table"
>
> I've been searching all over for how to do this if my added value is a
> scala function, with no luck.
>
> Let's say I have a SchemaRDD with columns A, B, and C, and I want to add a
> new column, D, calculated using Utility.process(b, c), and I want (of
> course) to pass in the value B and C from each row, ending up with a new
> SchemaRDD with columns A, B, C, and D.
>
> Is this possible? If so, how?
>
> Thanks,
> -Nathan
>
> --
> Nathan Kronenfeld
> Senior Visualization Developer
> Oculus Info Inc
> 2 Berkeley Street, Suite 600,
> Toronto, Ontario M5A 4J5
> Phone: +1-416-203-3003 x 238
> Email: nkronenfeld@oculusinfo.com
>
Re: Adding a column to a SchemaRDD
Posted by Tobias Pfeiffer <tg...@preferred.jp>.
Nathan,
On Fri, Dec 12, 2014 at 3:11 PM, Nathan Kronenfeld <
nkronenfeld@oculusinfo.com> wrote:
>
> I can see how to do it if can express the added values in SQL - just run
> "SELECT *,valueCalculation AS newColumnName FROM table"
>
> I've been searching all over for how to do this if my added value is a
> scala function, with no luck.
>
> Let's say I have a SchemaRDD with columns A, B, and C, and I want to add a
> new column, D, calculated using Utility.process(b, c), and I want (of
> course) to pass in the value B and C from each row, ending up with a new
> SchemaRDD with columns A, B, C, and D.
> <nk...@oculusinfo.com>
>
I guess you would have to do two things:
- schemardd.map(row => { extend the row here })
which will give you a plain RDD[Row] without a schema
- take the schema from the schemardd and extend it manually by the name and
type of the newly added column,
- create a new SchemaRDD from your mapped RDD and the manually extended
schema.
Does that make sense?
Tobias