You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:38:22 UTC

[jira] [Resolved] (SPARK-13116) TungstenAggregate though it is supposedly capable of all processing unsafe & safe rows, fails if the input is safe rows

     [ https://issues.apache.org/jira/browse/SPARK-13116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon resolved SPARK-13116.
----------------------------------
    Resolution: Incomplete

> TungstenAggregate though it is supposedly capable of all processing unsafe & safe rows, fails if the input is safe rows
> -----------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-13116
>                 URL: https://issues.apache.org/jira/browse/SPARK-13116
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.0
>            Reporter: Asif Hussain Shahid
>            Priority: Major
>              Labels: bulk-closed
>         Attachments: SPARK_13116_Test.scala
>
>
> TungstenAggregate though it is supposedly capable of all processing unsafe & safe rows, fails if the input is safe rows.
> If the input to TungstenAggregateIterator is a SafeRow, while the target is an UnsafeRow ,  the current code will try to set the fields in the UnsafeRow using the update method in UnSafeRow. 
> This method is called via TunsgtenAggregateIterator on the InterpretedMutableProjection. The target row in the InterpretedMutableProjection is an UnsafeRow, while the current row is a SafeRow.
> In the InterpretedMutableProjection's apply method, it invokes
>  mutableRow(i) = exprArray(i).eval(input)
> Now for UnsafeRow, the update method throws UnsupportedOperationException.
> The proposed fix I did for our forked branch , on the class InterpretedProjection is:
> +  private var targetUnsafe = false
>  +  type UnsafeSetter = (UnsafeRow,  Any ) => Unit
>  +  private var setters : Array[UnsafeSetter] = _
>     private[this] val exprArray = expressions.toArray
>     private[this] var mutableRow: MutableRow = new GenericMutableRow(exprArray.length)
>     def currentValue: InternalRow = mutableRow
>   
>  +
>     override def target(row: MutableRow): MutableProjection = {
>       mutableRow = row
>  +    targetUnsafe = row match {
>  +      case _:UnsafeRow =>{
>  +        if(setters == null) {
>  +          setters = Array.ofDim[UnsafeSetter](exprArray.length)
>  +          for(i <- 0 until exprArray.length) {
>  +            setters(i) = exprArray(i).dataType match {
>  +              case IntegerType => (target: UnsafeRow,  value: Any ) =>
>  +                target.setInt(i,value.asInstanceOf[Int])
>  +              case LongType => (target: UnsafeRow,  value: Any ) =>
>  +                target.setLong(i,value.asInstanceOf[Long])
>  +              case DoubleType => (target: UnsafeRow,  value: Any ) =>
>  +                target.setDouble(i,value.asInstanceOf[Double])
>  +              case FloatType => (target: UnsafeRow, value: Any ) =>
>  +                target.setFloat(i,value.asInstanceOf[Float])
>  +
>  +              case NullType => (target: UnsafeRow,  value: Any ) =>
>  +                target.setNullAt(i)
>  +
>  +              case BooleanType => (target: UnsafeRow,  value: Any ) =>
>  +                target.setBoolean(i,value.asInstanceOf[Boolean])
>  +
>  +              case ByteType => (target: UnsafeRow,  value: Any ) =>
>  +                target.setByte(i,value.asInstanceOf[Byte])
>  +              case ShortType => (target: UnsafeRow, value: Any ) =>
>  +                target.setShort(i,value.asInstanceOf[Short])
>  +
>  +            }
>  +          }
>  +        }
>  +        true
>  +      }
>  +      case _ => false
>  +    }
>  +
>       this
>     }
>   
>     override def apply(input: InternalRow): InternalRow = {
>       var i = 0
>       while (i < exprArray.length) {
>  -      mutableRow(i) = exprArray(i).eval(input)
>  +      if(targetUnsafe) {
>  +        setters(i)(mutableRow.asInstanceOf[UnsafeRow], exprArray(i).eval(input))
>  +      }else {
>  +        mutableRow(i) = exprArray(i).eval(input)
>  +      }
>         i += 1
>       }



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org