You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by goi cto <go...@gmail.com> on 2014/02/13 19:53:49 UTC

Mapping between two RDD?

Hi,

I have the following code:

val data = List(
List("Tx1", "node A", "ROOT", 100),
List("Tx2", "node B", "Tx1", 50),
List("Tx3", "node C", "Tx1", 50),
List("Tx4", "node B", "ROOT", 100),
List("Tx5", "node C", "Tx4", 75),
List("Tx6", "node B", "Tx4", 25));
val distData = sc.parallelize(data);
val txData = distData.map(x=>(x(1),(x(0),x(2),x(3))));
val destNode = txData.groupByKey();
/*
res60: Array[(Any, Seq[(Any, Any, Any)])] = Array((node
A,ArrayBuffer((Tx1,ROOT,100))), (node B,ArrayBuffer((Tx2,Tx1,50),
(Tx4,ROOT,100), (Tx6,Tx4,25))), (node C,ArrayBuffer((Tx3,Tx1,50),
(Tx5,Tx4,75))))
*/
val txData = distData.map(x=>(x(0),x(1)));
val txgData = txData.groupByKey();
val txfData = txgData.map(p => (p._1,p._2(0)))
val root = List("ROOT","rootNode");
val rootNode = sc.parallelize(root);

*How do I union rootNodeRDD with txfData RDD?*

/*
res61: Array[(Any, Any)] = Array((Tx1,node A), (Tx6,node B), (Tx4,node B),
(Tx2,node B), (Tx5,node C), (Tx3,node C))
*/

*How do I map the 2nd argument in the destNode RDD with the key of the
txfData RDD?*
*Outcome should be destNode RDD with the value from txfData RDD by key.*
*(node B,ArrayBuffer(ArrayBuffer((Tx2,Tx1,50),node A),
ArrayBuffer((Tx4,ROOT,100),rootNode), ArrayBuffer((Tx6,Tx4,25),node B)))*

Help appreciated!
-- 
Eran | CTO