You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Erik Erlandson (Jira)" <ji...@apache.org> on 2020/07/02 18:12:00 UTC

[jira] [Commented] (SPARK-32159) New udaf(Aggregator) has an integration bug with UnresolvedMapObjects serialization

    [ https://issues.apache.org/jira/browse/SPARK-32159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17150490#comment-17150490 ] 

Erik Erlandson commented on SPARK-32159:
----------------------------------------

cc [~cloud_fan]

> New udaf(Aggregator) has an integration bug with UnresolvedMapObjects serialization
> -----------------------------------------------------------------------------------
>
>                 Key: SPARK-32159
>                 URL: https://issues.apache.org/jira/browse/SPARK-32159
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.0.0
>            Reporter: Erik Erlandson
>            Priority: Major
>
> The new user defined aggregator feature (SPARK-27296) based on calling 'functions.udaf(aggregator)' works fine when the aggregator input type is atomic, e.g. 'Aggregator[Double, _, _]', however if the input type is an array, like 'Aggregator[Array[Double], _, _]',  it is tripping over the following:
> {{/**
>  * When constructing [[MapObjects]], the element type must be given, which may not be available
>  * before analysis. This class acts like a placeholder for [[MapObjects]], and will be replaced by
>  * [[MapObjects]] during analysis after the input data is resolved.
>  * Note that, ideally we should not serialize and send unresolved expressions to executors, but
>  * users may accidentally do this(e.g. mistakenly reference an encoder instance when implementing
>  * Aggregator). Here we mark `function` as transient because it may reference scala Type, which is
>  * not serializable. Then even users mistakenly reference unresolved expression and serialize it,
>  * it's just a performance issue(more network traffic), and will not fail.
>  */
> case class UnresolvedMapObjects(
>     @transient function: Expression => Expression,
>     child: Expression,
>     customCollectionCls: Option[Class[_]] = None) extends UnaryExpression with Unevaluable {
>   override lazy val resolved = false
>   override def dataType: DataType = customCollectionCls.map(ObjectType.apply).getOrElse {
>     throw new UnsupportedOperationException("not resolved")
>   }
> }}}
> The '@transient' is causing the function to be unpacked as 'null' over on the executors, and it is causing a null-pointer exception here, when it tries to do 'function(loopVar)'
> {{object MapObjects {
>   def apply(
>       function: Expression => Expression,
>       inputData: Expression,
>       elementType: DataType,
>       elementNullable: Boolean = true,
>       customCollectionCls: Option[Class[_]] = None): MapObjects = {
>     val loopVar = LambdaVariable("MapObject", elementType, elementNullable)
>     MapObjects(loopVar, function(loopVar), inputData, customCollectionCls)
>   }
> }
> }}
> I believe it may be possible to just use 'loopVar' instead of 'function(loopVar)', whenever 'function' is null, but need second opinion from catalyst developers on what a robust fix should be



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org