You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Erik Erlandson (Jira)" <ji...@apache.org> on 2020/07/02 18:10:00 UTC
[jira] [Updated] (SPARK-32159) New udaf(Aggregator) has an
integration bug with UnresolvedMapObjects serialization
[ https://issues.apache.org/jira/browse/SPARK-32159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Erik Erlandson updated SPARK-32159:
-----------------------------------
Description:
The new user defined aggregator feature (SPARK-27296) based on calling 'functions.udaf(aggregator)' works fine when the aggregator input type is atomic, e.g. 'Aggregator[Double, _, _]', however if the input type is an array, like 'Aggregator[Array[Double], _, _]', it is tripping over the following:
{{/**
* When constructing [[MapObjects]], the element type must be given, which may not be available
* before analysis. This class acts like a placeholder for [[MapObjects]], and will be replaced by
* [[MapObjects]] during analysis after the input data is resolved.
* Note that, ideally we should not serialize and send unresolved expressions to executors, but
* users may accidentally do this(e.g. mistakenly reference an encoder instance when implementing
* Aggregator). Here we mark `function` as transient because it may reference scala Type, which is
* not serializable. Then even users mistakenly reference unresolved expression and serialize it,
* it's just a performance issue(more network traffic), and will not fail.
*/
case class UnresolvedMapObjects(
@transient function: Expression => Expression,
child: Expression,
customCollectionCls: Option[Class[_]] = None) extends UnaryExpression with Unevaluable {
override lazy val resolved = false
override def dataType: DataType = customCollectionCls.map(ObjectType.apply).getOrElse {
throw new UnsupportedOperationException("not resolved")
}
}}}
The '@transient' is causing the function to be unpacked as 'null' over on the executors, and it is causing a null-pointer exception here, when it tries to do 'function(loopVar)'
{{object MapObjects {
def apply(
function: Expression => Expression,
inputData: Expression,
elementType: DataType,
elementNullable: Boolean = true,
customCollectionCls: Option[Class[_]] = None): MapObjects = {
val loopVar = LambdaVariable("MapObject", elementType, elementNullable)
MapObjects(loopVar, function(loopVar), inputData, customCollectionCls)
}
}
}}
I believe it may be possible to just use 'loopVar' instead of 'function(loopVar)', whenever 'function' is null, but need second opinion from catalyst developers on what a robust fix should be
was:
The new user defined aggregator feature (SPARK-27296) based on calling 'functions.udaf(aggregator)' works fine when the aggregator input type is atomic, e.g. 'Aggregator[Double, _, _]', however if the input type is an array, like 'Aggregator[Array[Double], _, _]', it is tripping over the following:
{{
/**
* When constructing [[MapObjects]], the element type must be given, which may not be available
* before analysis. This class acts like a placeholder for [[MapObjects]], and will be replaced by
* [[MapObjects]] during analysis after the input data is resolved.
* Note that, ideally we should not serialize and send unresolved expressions to executors, but
* users may accidentally do this(e.g. mistakenly reference an encoder instance when implementing
* Aggregator). Here we mark `function` as transient because it may reference scala Type, which is
* not serializable. Then even users mistakenly reference unresolved expression and serialize it,
* it's just a performance issue(more network traffic), and will not fail.
*/
case class UnresolvedMapObjects(
@transient function: Expression => Expression,
child: Expression,
customCollectionCls: Option[Class[_]] = None) extends UnaryExpression with Unevaluable {
override lazy val resolved = false
override def dataType: DataType = customCollectionCls.map(ObjectType.apply).getOrElse {
throw new UnsupportedOperationException("not resolved")
}
}
}}
The '@transient' is causing the function to be unpacked as 'null' over on the executors, and it is causing a null-pointer exception here, when it tries to do 'function(loopVar)'
{{
object MapObjects {
def apply(
function: Expression => Expression,
inputData: Expression,
elementType: DataType,
elementNullable: Boolean = true,
customCollectionCls: Option[Class[_]] = None): MapObjects = {
val loopVar = LambdaVariable("MapObject", elementType, elementNullable)
MapObjects(loopVar, function(loopVar), inputData, customCollectionCls)
}
}
}}
I believe it may be possible to just use 'loopVar' instead of 'function(loopVar)', whenever 'function' is null, but need second opinion from catalyst developers on what a robust fix should be
> New udaf(Aggregator) has an integration bug with UnresolvedMapObjects serialization
> -----------------------------------------------------------------------------------
>
> Key: SPARK-32159
> URL: https://issues.apache.org/jira/browse/SPARK-32159
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.0.0
> Reporter: Erik Erlandson
> Priority: Major
>
> The new user defined aggregator feature (SPARK-27296) based on calling 'functions.udaf(aggregator)' works fine when the aggregator input type is atomic, e.g. 'Aggregator[Double, _, _]', however if the input type is an array, like 'Aggregator[Array[Double], _, _]', it is tripping over the following:
> {{/**
> * When constructing [[MapObjects]], the element type must be given, which may not be available
> * before analysis. This class acts like a placeholder for [[MapObjects]], and will be replaced by
> * [[MapObjects]] during analysis after the input data is resolved.
> * Note that, ideally we should not serialize and send unresolved expressions to executors, but
> * users may accidentally do this(e.g. mistakenly reference an encoder instance when implementing
> * Aggregator). Here we mark `function` as transient because it may reference scala Type, which is
> * not serializable. Then even users mistakenly reference unresolved expression and serialize it,
> * it's just a performance issue(more network traffic), and will not fail.
> */
> case class UnresolvedMapObjects(
> @transient function: Expression => Expression,
> child: Expression,
> customCollectionCls: Option[Class[_]] = None) extends UnaryExpression with Unevaluable {
> override lazy val resolved = false
> override def dataType: DataType = customCollectionCls.map(ObjectType.apply).getOrElse {
> throw new UnsupportedOperationException("not resolved")
> }
> }}}
> The '@transient' is causing the function to be unpacked as 'null' over on the executors, and it is causing a null-pointer exception here, when it tries to do 'function(loopVar)'
> {{object MapObjects {
> def apply(
> function: Expression => Expression,
> inputData: Expression,
> elementType: DataType,
> elementNullable: Boolean = true,
> customCollectionCls: Option[Class[_]] = None): MapObjects = {
> val loopVar = LambdaVariable("MapObject", elementType, elementNullable)
> MapObjects(loopVar, function(loopVar), inputData, customCollectionCls)
> }
> }
> }}
> I believe it may be possible to just use 'loopVar' instead of 'function(loopVar)', whenever 'function' is null, but need second opinion from catalyst developers on what a robust fix should be
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org