You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by daunnc <da...@gmail.com> on 2016/10/19 12:28:47 UTC

Joins of typed datasets

Hi! 
I work with a new Spark 2 datasets api. PR:
https://github.com/geotrellis/geotrellis/pull/1675

The idea is to use Datasets[(K, V)] and for example to join by Key of type
K. 
The first problems was that there are no Encoders for custom types (not
products), so the workaround was to use Kryo:
https://github.com/pomadchin/geotrellis/blob/4f417f3c5e99eacf2ca57b4e8405047d556beda0/spark/src/main/scala/geotrellis/spark/KryoEncoderImplicits.scala

But it has a limitation, that we can't join on this K type (i suppose as
Spark represents everything as byte blobs, using kryo encoder): 

def combineValues[R: ClassTag](other: Dataset[(K, V)])(f: (V, V) => R):
Dataset[(K, R)] = {
  self.toDF("_1", "_2").alias("self").join(other.toDF("_1",
"_2").alias("other"), $"self._1" === $"other._1").as[(K, (V,
V))].mapValues({ case (tile1, tile2) =>
    f(tile1, tile2)
  })
}

What is the correct solution? K is important, as it is a geospatial key.




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Joins-of-typed-datasets-tp27924.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Joins of typed datasets

Posted by daunnc <da...@gmail.com>.
A situation changes a bit, and the workaround is to add `K` restriction (K
should be a subtype of Product); 

Thought I have right now another error: 

org.apache.spark.sql.AnalysisException: cannot resolve '(`key` = `key`)' due
to data type mismatch: differing types in '(`key` = `key`)'
(struct<col:int,row:int> and struct<col:int,row:int>).;
[info]   at
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
[info]   at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:82)
[info]   at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:74)
[info]   at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:301)
[info]   at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:301)
[info]   at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
[info]   at
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:300)
[info]   at
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionUp$1(QueryPlan.scala:191)
[info]   at
org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:202)
[info]   at
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$5.apply(QueryPlan.scala:210)

Mb it is cased by the fact that a certain Key extends Product2? 

case class SpatialKey(col: Int, row: Int) extends Product2[Int, Int] // with
join by this key would throw err
case class SpaceTimeKey(col: Int, row: Int, instant: Long) // with join by
this key would be no errs



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Joins-of-typed-datasets-tp27924p27929.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org