You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:38:22 UTC
[jira] [Resolved] (SPARK-13116) TungstenAggregate though it is
supposedly capable of all processing unsafe & safe rows, fails if the input
is safe rows
[ https://issues.apache.org/jira/browse/SPARK-13116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon resolved SPARK-13116.
----------------------------------
Resolution: Incomplete
> TungstenAggregate though it is supposedly capable of all processing unsafe & safe rows, fails if the input is safe rows
> -----------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-13116
> URL: https://issues.apache.org/jira/browse/SPARK-13116
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.6.0
> Reporter: Asif Hussain Shahid
> Priority: Major
> Labels: bulk-closed
> Attachments: SPARK_13116_Test.scala
>
>
> TungstenAggregate though it is supposedly capable of all processing unsafe & safe rows, fails if the input is safe rows.
> If the input to TungstenAggregateIterator is a SafeRow, while the target is an UnsafeRow , the current code will try to set the fields in the UnsafeRow using the update method in UnSafeRow.
> This method is called via TunsgtenAggregateIterator on the InterpretedMutableProjection. The target row in the InterpretedMutableProjection is an UnsafeRow, while the current row is a SafeRow.
> In the InterpretedMutableProjection's apply method, it invokes
> mutableRow(i) = exprArray(i).eval(input)
> Now for UnsafeRow, the update method throws UnsupportedOperationException.
> The proposed fix I did for our forked branch , on the class InterpretedProjection is:
> + private var targetUnsafe = false
> + type UnsafeSetter = (UnsafeRow, Any ) => Unit
> + private var setters : Array[UnsafeSetter] = _
> private[this] val exprArray = expressions.toArray
> private[this] var mutableRow: MutableRow = new GenericMutableRow(exprArray.length)
> def currentValue: InternalRow = mutableRow
>
> +
> override def target(row: MutableRow): MutableProjection = {
> mutableRow = row
> + targetUnsafe = row match {
> + case _:UnsafeRow =>{
> + if(setters == null) {
> + setters = Array.ofDim[UnsafeSetter](exprArray.length)
> + for(i <- 0 until exprArray.length) {
> + setters(i) = exprArray(i).dataType match {
> + case IntegerType => (target: UnsafeRow, value: Any ) =>
> + target.setInt(i,value.asInstanceOf[Int])
> + case LongType => (target: UnsafeRow, value: Any ) =>
> + target.setLong(i,value.asInstanceOf[Long])
> + case DoubleType => (target: UnsafeRow, value: Any ) =>
> + target.setDouble(i,value.asInstanceOf[Double])
> + case FloatType => (target: UnsafeRow, value: Any ) =>
> + target.setFloat(i,value.asInstanceOf[Float])
> +
> + case NullType => (target: UnsafeRow, value: Any ) =>
> + target.setNullAt(i)
> +
> + case BooleanType => (target: UnsafeRow, value: Any ) =>
> + target.setBoolean(i,value.asInstanceOf[Boolean])
> +
> + case ByteType => (target: UnsafeRow, value: Any ) =>
> + target.setByte(i,value.asInstanceOf[Byte])
> + case ShortType => (target: UnsafeRow, value: Any ) =>
> + target.setShort(i,value.asInstanceOf[Short])
> +
> + }
> + }
> + }
> + true
> + }
> + case _ => false
> + }
> +
> this
> }
>
> override def apply(input: InternalRow): InternalRow = {
> var i = 0
> while (i < exprArray.length) {
> - mutableRow(i) = exprArray(i).eval(input)
> + if(targetUnsafe) {
> + setters(i)(mutableRow.asInstanceOf[UnsafeRow], exprArray(i).eval(input))
> + }else {
> + mutableRow(i) = exprArray(i).eval(input)
> + }
> i += 1
> }
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org