You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Martin Brandt (JIRA)" <ji...@apache.org> on 2016/04/13 01:46:25 UTC
[jira] [Comment Edited] (SPARK-13116) TungstenAggregate though it
is supposedly capable of all processing unsafe & safe rows, fails if the
input is safe rows
[ https://issues.apache.org/jira/browse/SPARK-13116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15238264#comment-15238264 ]
Martin Brandt edited comment on SPARK-13116 at 4/12/16 11:46 PM:
-----------------------------------------------------------------
I am seeing what looks like the issue described here, in Spark 1.6.1 when querying a fairly wide table:
java.lang.UnsupportedOperationException
at org.apache.spark.sql.catalyst.expressions.UnsafeRow.update(UnsafeRow.java:238)
at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:89)
at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:60)
at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.createNewAggregationBuffer(TungstenAggregationIterator.scala:248)
This reproduces consistently on my 5 node cluster using the following code. Please note if I change the number of columns to 2700 or less the problem does NOT occur, at 2800 columns and above it always occurs.
{code:borderStyle=solid}
import org.apache.spark.sql._
import org.apache.spark.sql.types._
val c = 1 to 2800
val r = 1 to 10
val rows = sc.parallelize(r.map(i=>
Row.fromSeq(c.map(_+i))))
val fields = c.map(i => StructField("t" + i.toString, IntegerType, true))
val schema = StructType(fields)
val df = sqlContext.createDataFrame(rows, schema)
df.registerTempTable("temp")
val sql = c.map(i=>"avg(t" + i.toString + ")").mkString(",")
val sel = sqlContext.sql("SELECT " + sql + " FROM temp")
sel.collect()
{code}
was (Author: mbrandt):
I am seeing what looks like the issue described here, in Spark 1.6.1 when querying a fairly wide table:
java.lang.UnsupportedOperationException
at org.apache.spark.sql.catalyst.expressions.UnsafeRow.update(UnsafeRow.java:238)
at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:89)
at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:60)
at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.createNewAggregationBuffer(TungstenAggregationIterator.scala:248)
This reproduces consistently on my 5 node cluster using the following code. Please note if I change the number of columns to 2700 or less the problem does NOT occur, at 2800 columns and above it always occurs.
import org.apache.spark.sql._
import org.apache.spark.sql.types._
val c = 1 to 2800
val r = 1 to 10
val rows = sc.parallelize(r.map(i=>
Row.fromSeq(c.map(_+i))))
val fields = c.map(i => StructField("t" + i.toString, IntegerType, true))
val schema = StructType(fields)
val df = sqlContext.createDataFrame(rows, schema)
df.registerTempTable("temp")
val sql = c.map(i=>"avg(t"+i.toString+")").mkString(",")
val sel = sqlContext.sql("SELECT " + sql + " FROM temp")
sel.collect()
> TungstenAggregate though it is supposedly capable of all processing unsafe & safe rows, fails if the input is safe rows
> -----------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-13116
> URL: https://issues.apache.org/jira/browse/SPARK-13116
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.6.0
> Reporter: Asif Hussain Shahid
> Attachments: SPARK_13116_Test.scala
>
>
> TungstenAggregate though it is supposedly capable of all processing unsafe & safe rows, fails if the input is safe rows.
> If the input to TungstenAggregateIterator is a SafeRow, while the target is an UnsafeRow , the current code will try to set the fields in the UnsafeRow using the update method in UnSafeRow.
> This method is called via TunsgtenAggregateIterator on the InterpretedMutableProjection. The target row in the InterpretedMutableProjection is an UnsafeRow, while the current row is a SafeRow.
> In the InterpretedMutableProjection's apply method, it invokes
> mutableRow(i) = exprArray(i).eval(input)
> Now for UnsafeRow, the update method throws UnsupportedOperationException.
> The proposed fix I did for our forked branch , on the class InterpretedProjection is:
> + private var targetUnsafe = false
> + type UnsafeSetter = (UnsafeRow, Any ) => Unit
> + private var setters : Array[UnsafeSetter] = _
> private[this] val exprArray = expressions.toArray
> private[this] var mutableRow: MutableRow = new GenericMutableRow(exprArray.length)
> def currentValue: InternalRow = mutableRow
>
> +
> override def target(row: MutableRow): MutableProjection = {
> mutableRow = row
> + targetUnsafe = row match {
> + case _:UnsafeRow =>{
> + if(setters == null) {
> + setters = Array.ofDim[UnsafeSetter](exprArray.length)
> + for(i <- 0 until exprArray.length) {
> + setters(i) = exprArray(i).dataType match {
> + case IntegerType => (target: UnsafeRow, value: Any ) =>
> + target.setInt(i,value.asInstanceOf[Int])
> + case LongType => (target: UnsafeRow, value: Any ) =>
> + target.setLong(i,value.asInstanceOf[Long])
> + case DoubleType => (target: UnsafeRow, value: Any ) =>
> + target.setDouble(i,value.asInstanceOf[Double])
> + case FloatType => (target: UnsafeRow, value: Any ) =>
> + target.setFloat(i,value.asInstanceOf[Float])
> +
> + case NullType => (target: UnsafeRow, value: Any ) =>
> + target.setNullAt(i)
> +
> + case BooleanType => (target: UnsafeRow, value: Any ) =>
> + target.setBoolean(i,value.asInstanceOf[Boolean])
> +
> + case ByteType => (target: UnsafeRow, value: Any ) =>
> + target.setByte(i,value.asInstanceOf[Byte])
> + case ShortType => (target: UnsafeRow, value: Any ) =>
> + target.setShort(i,value.asInstanceOf[Short])
> +
> + }
> + }
> + }
> + true
> + }
> + case _ => false
> + }
> +
> this
> }
>
> override def apply(input: InternalRow): InternalRow = {
> var i = 0
> while (i < exprArray.length) {
> - mutableRow(i) = exprArray(i).eval(input)
> + if(targetUnsafe) {
> + setters(i)(mutableRow.asInstanceOf[UnsafeRow], exprArray(i).eval(input))
> + }else {
> + mutableRow(i) = exprArray(i).eval(input)
> + }
> i += 1
> }
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org