You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Anton Okolnychyi (JIRA)" <ji...@apache.org> on 2016/11/21 23:16:59 UTC
[jira] [Created] (SPARK-18534) Datasets Aggregation with Maps
Anton Okolnychyi created SPARK-18534:
----------------------------------------
Summary: Datasets Aggregation with Maps
Key: SPARK-18534
URL: https://issues.apache.org/jira/browse/SPARK-18534
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 1.6.3
Reporter: Anton Okolnychyi
There is a problem with user-defined aggregations in the Dataset API in Spark 1.6.3, while the identical code works fine in Spark 2.0.
The problem appears only if {{ExpressionEncoder()}} is used for Maps. The same code with a Kryo-based alternative produces a correct result. If the encoder for a map is defined with the help of {{ExpressionEncoder()}}, Spark is not capable of reading the reduced values in the merge phase of the considered aggregation.
Code to reproduce:
{code}
case class TestStopPoint(line: String, sequenceNumber: Int, id: String)
// Does not work with ExpressionEncoder() and produces an empty map as a result
implicit val intStringMapEncoder: Encoder[Map[Int, String]] = ExpressionEncoder()
// Will work if a Kryo-based encoder is used
// implicit val intStringMapEncoder: Encoder[Map[Int, String]] = org.apache.spark.sql.Encoders.kryo[Map[Int, String]]
val sparkConf = new SparkConf()
.setAppName("DS Spark 1.6 Test")
.setMaster("local[4]")
val sparkContext = new SparkContext(sparkConf)
val sparkSqlContext = new SQLContext(sparkContext)
import sparkSqlContext.implicits._
val stopPointDS = Seq(TestStopPoint("33", 1, "id#1"), TestStopPoint("33", 2, "id#2")).toDS()
val stopPointSequenceMap = new Aggregator[TestStopPoint, Map[Int, String], Map[Int, String]] {
override def zero = Map[Int, String]()
override def reduce(map: Map[Int, String], stopPoint: TestStopPoint) = {
map.updated(stopPoint.sequenceNumber, stopPoint.id)
}
override def merge(map: Map[Int, String], anotherMap: Map[Int, String]) = {
map ++ anotherMap
}
override def finish(reduction: Map[Int, String]) = reduction
}.toColumn
val resultMap = stopPointDS
.groupBy(_.line)
.agg(stopPointSequenceMap)
.collect()
.toMap
{code}
The code above produces an empty map as a result if the Map encoder is defined as {{ExpressionEncoder()}}. The Kryo-based encoder works fine (commented in the code).
A preliminary investigation was done to find out possible reasons for this behavior. I am not a Spark expert but hope it will help.
The Physical Plan looks like:
{noformat}
== Physical Plan ==
SortBasedAggregate(key=[value#55], functions=[(anon$1(line#4,sequenceNumber#5,id#6),mode=Final,isDistinct=false)], output=[value#55,anon$1(line,sequenceNumber,id)#64])
+- ConvertToSafe
+- Sort [value#55 ASC], false, 0
+- TungstenExchange hashpartitioning(value#55,1), None
+- ConvertToUnsafe
+- SortBasedAggregate(key=[value#55], functions=[(anon$1(line#4,sequenceNumber#5,id#6),mode=Partial,isDistinct=false)], output=[value#55,value#60])
+- ConvertToSafe
+- Sort [value#55 ASC], false, 0
+- !AppendColumns <function1>, class[line[0]: string, sequenceNumber[0]: int, id[0]: string], class[value[0]: string], [value#55]
+- ConvertToUnsafe
+- LocalTableScan [line#4,sequenceNumber#5,id#6], [[0,2000000002,1,2800000004,3333,31236469],[0,2000000002,2,2800000004,3333,32236469]]
{noformat}
Everything including the first (from bottom) {{SortBasedAggregate}} step is handled correctly. In particular, I see that each row updates the mutable aggregation buffer correctly in the {{update()}} method of the {{org.apache.spark.sql.execution.aggregate.TypedAggregateExpression}} class. In my view, the problem appears in the {{ConvertToUnsafe}} step directly after the first {{SortBasedAggregate}}. If I take a look at the {{org.apache.spark.sql.execution.ConvertToUnsafe}} class, I can see that the first {{SortBasedAggregate}} returns a map with 2 elements (I call {{child.execute().collect()(0).getMap(1)}} in {{doExecute()}} of {{ConvertToUnsafe}} to see this). However, if I examine the output of this {{ConvertToUnsafe}} in the same way as its input, I see that the result map does not contain any elements. As a consequence, Spark operates on two empty maps in the {{merge()}} method of the {{TypedAggregateExpression}} class.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org