You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Anton Okolnychyi (JIRA)" <ji...@apache.org> on 2016/12/04 20:04:58 UTC

[jira] [Commented] (SPARK-18534) Datasets Aggregation with Maps

    [ https://issues.apache.org/jira/browse/SPARK-18534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15720504#comment-15720504 ] 

Anton Okolnychyi commented on SPARK-18534:
------------------------------------------

I have done a small investigation to see what is going on under the hood. 

I started by looking at the code that was generated by Spark for the second {{ConvertToUnsafe}} operation (right after the first {{SortBasedAggregate}}). Once I understood each step there, I came to the conclusion that is fully correct (I can present the generated code and the structure of the result {{UnsafeRow}}). However, the problem from the ticket description was present: the result {{UnsafeRow}} contained a corrupted Map. I took a detailed look at the incoming {{InternalRow}}, which contained the {{UnsafeMapData}} that was written without any modifications to the underlying buffer of the result {{UnsafeRow}}, and it was corrupted as well. Interestingly, only the keys were corrupted, the values in the incoming InternalRow could be retrieved correctly. Therefore, the problem appeared before this step and the second {{ConvertToUnsafe}} operation was done correctly. 

Then I decided to look at previous steps. In my view, the problem appears in the {{next()}} method of {{SortBasedAggregationIterator}}. Here is the code for it:

{code} 
  override final def next(): InternalRow = {
    if (hasNext) {
      // Process the current group.
      processCurrentSortedGroup()
      // Generate output row for the current group.
      val outputRow = generateOutput(currentGroupingKey, sortBasedAggregationBuffer)
      // Initialize buffer values for the next group.
      initializeBuffer(sortBasedAggregationBuffer)
      numOutputRows += 1
      outputRow
    } else {
      // no more result
      throw new NoSuchElementException
    }
  }
{code}

The outputRow contains correct results (I can retrieve the keys correctly) but only before the {{initializeBuffer(sortBasedAggregationBuffer)}} invocation. If I change this method slightly, I get the expected results. 

{code}
  override final def next(): InternalRow = {
    if (hasNext) {
      // INITIALIZE BEFORE
      initializeBuffer(sortBasedAggregationBuffer)
      // Process the current group.
      processCurrentSortedGroup()
      // Generate output row for the current group.
      val outputRow = generateOutput(currentGroupingKey, sortBasedAggregationBuffer)
      numOutputRows += 1
      outputRow
    } else {
      // no more result
      throw new NoSuchElementException
    }
  }
{code}

> Datasets Aggregation with Maps
> ------------------------------
>
>                 Key: SPARK-18534
>                 URL: https://issues.apache.org/jira/browse/SPARK-18534
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.2, 1.6.3
>            Reporter: Anton Okolnychyi
>
> There is a problem with user-defined aggregations in the Dataset API in Spark 1.6.3, while the identical code works fine in Spark 2.0. 
> The problem appears only if {{ExpressionEncoder()}} is used for Maps. The same code with a Kryo-based alternative produces a correct result. If the encoder for a map is defined with the help of {{ExpressionEncoder()}}, Spark is not capable of reading the reduced values in the merge phase of the considered aggregation.
> Code to reproduce:
> {code}
>   case class TestStopPoint(line: String, sequenceNumber: Int, id: String)
>   // Does not work with ExpressionEncoder() and produces an empty map as a result
>   implicit val intStringMapEncoder: Encoder[Map[Int, String]] = ExpressionEncoder()
>   // Will work if a Kryo-based encoder is used
>   // implicit val intStringMapEncoder: Encoder[Map[Int, String]] = org.apache.spark.sql.Encoders.kryo[Map[Int, String]]
>   val sparkConf = new SparkConf()
>     .setAppName("DS Spark 1.6 Test")
>     .setMaster("local[4]")
>   val sparkContext = new SparkContext(sparkConf)
>   val sparkSqlContext = new SQLContext(sparkContext)
>   import sparkSqlContext.implicits._
>   val stopPointDS = Seq(TestStopPoint("33", 1, "id#1"), TestStopPoint("33", 2, "id#2")).toDS()
>   val stopPointSequenceMap = new Aggregator[TestStopPoint, Map[Int, String], Map[Int, String]] {
>     override def zero = Map[Int, String]()
>     override def reduce(map: Map[Int, String], stopPoint: TestStopPoint) = {
>       map.updated(stopPoint.sequenceNumber, stopPoint.id)
>     }
>     override def merge(map: Map[Int, String], anotherMap: Map[Int, String]) = {
>       map ++ anotherMap
>     }
>     override def finish(reduction: Map[Int, String]) = reduction
>   }.toColumn
>   val resultMap = stopPointDS
>     .groupBy(_.line)
>     .agg(stopPointSequenceMap)
>     .collect()
>     .toMap
> {code}
> The code above produces an empty map as a result if the Map encoder is defined as {{ExpressionEncoder()}}. The Kryo-based encoder works fine (commented in the code).
> A preliminary investigation was done to find out possible reasons for this behavior. I am not a Spark expert but hope it will help. 
> The Physical Plan looks like:
> {noformat}
> == Physical Plan ==
> SortBasedAggregate(key=[value#55], functions=[(anon$1(line#4,sequenceNumber#5,id#6),mode=Final,isDistinct=false)], output=[value#55,anon$1(line,sequenceNumber,id)#64])
> +- ConvertToSafe
>    +- Sort [value#55 ASC], false, 0
>       +- TungstenExchange hashpartitioning(value#55,1), None
>          +- ConvertToUnsafe
>             +- SortBasedAggregate(key=[value#55], functions=[(anon$1(line#4,sequenceNumber#5,id#6),mode=Partial,isDistinct=false)], output=[value#55,value#60])
>                +- ConvertToSafe
>                   +- Sort [value#55 ASC], false, 0
>                      +- !AppendColumns <function1>, class[line[0]: string, sequenceNumber[0]: int, id[0]: string], class[value[0]: string], [value#55]
>                         +- ConvertToUnsafe
>                            +- LocalTableScan [line#4,sequenceNumber#5,id#6], [[0,2000000002,1,2800000004,3333,31236469],[0,2000000002,2,2800000004,3333,32236469]]
> {noformat}
> Everything including the first (from bottom) {{SortBasedAggregate}} step is handled correctly. In particular, I see that each row updates the mutable aggregation buffer correctly in the {{update()}} method of the {{org.apache.spark.sql.execution.aggregate.TypedAggregateExpression}} class. In my view, the problem appears in the {{ConvertToUnsafe}} step directly after the first {{SortBasedAggregate}}. If I take a look at the {{org.apache.spark.sql.execution.ConvertToUnsafe}} class, I can see that the first {{SortBasedAggregate}} returns a map with 2 elements (I call {{child.execute().collect()(0).getMap(1)}} in {{doExecute()}} of {{ConvertToUnsafe}} to see this). However, if I examine the output of this {{ConvertToUnsafe}} in the same way as its input, I see that the result map does not contain any elements. As a consequence, Spark operates on two empty maps in the {{merge()}} method of the {{TypedAggregateExpression}} class.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org