You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Erik Erlandson (Jira)" <ji...@apache.org> on 2020/07/02 18:12:00 UTC
[jira] [Commented] (SPARK-32159) New udaf(Aggregator) has an
integration bug with UnresolvedMapObjects serialization
[ https://issues.apache.org/jira/browse/SPARK-32159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17150490#comment-17150490 ]
Erik Erlandson commented on SPARK-32159:
----------------------------------------
cc [~cloud_fan]
> New udaf(Aggregator) has an integration bug with UnresolvedMapObjects serialization
> -----------------------------------------------------------------------------------
>
> Key: SPARK-32159
> URL: https://issues.apache.org/jira/browse/SPARK-32159
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.0.0
> Reporter: Erik Erlandson
> Priority: Major
>
> The new user defined aggregator feature (SPARK-27296) based on calling 'functions.udaf(aggregator)' works fine when the aggregator input type is atomic, e.g. 'Aggregator[Double, _, _]', however if the input type is an array, like 'Aggregator[Array[Double], _, _]', it is tripping over the following:
> {{/**
> * When constructing [[MapObjects]], the element type must be given, which may not be available
> * before analysis. This class acts like a placeholder for [[MapObjects]], and will be replaced by
> * [[MapObjects]] during analysis after the input data is resolved.
> * Note that, ideally we should not serialize and send unresolved expressions to executors, but
> * users may accidentally do this(e.g. mistakenly reference an encoder instance when implementing
> * Aggregator). Here we mark `function` as transient because it may reference scala Type, which is
> * not serializable. Then even users mistakenly reference unresolved expression and serialize it,
> * it's just a performance issue(more network traffic), and will not fail.
> */
> case class UnresolvedMapObjects(
> @transient function: Expression => Expression,
> child: Expression,
> customCollectionCls: Option[Class[_]] = None) extends UnaryExpression with Unevaluable {
> override lazy val resolved = false
> override def dataType: DataType = customCollectionCls.map(ObjectType.apply).getOrElse {
> throw new UnsupportedOperationException("not resolved")
> }
> }}}
> The '@transient' is causing the function to be unpacked as 'null' over on the executors, and it is causing a null-pointer exception here, when it tries to do 'function(loopVar)'
> {{object MapObjects {
> def apply(
> function: Expression => Expression,
> inputData: Expression,
> elementType: DataType,
> elementNullable: Boolean = true,
> customCollectionCls: Option[Class[_]] = None): MapObjects = {
> val loopVar = LambdaVariable("MapObject", elementType, elementNullable)
> MapObjects(loopVar, function(loopVar), inputData, customCollectionCls)
> }
> }
> }}
> I believe it may be possible to just use 'loopVar' instead of 'function(loopVar)', whenever 'function' is null, but need second opinion from catalyst developers on what a robust fix should be
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org