You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2016/10/07 21:03:52 UTC
[2/3] spark git commit: [SPARK-17761][SQL] Remove MutableRow
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
index 4e072a9..2988161 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
@@ -118,7 +118,7 @@ class TungstenAggregationIterator(
private def createNewAggregationBuffer(): UnsafeRow = {
val bufferSchema = aggregateFunctions.flatMap(_.aggBufferAttributes)
val buffer: UnsafeRow = UnsafeProjection.create(bufferSchema.map(_.dataType))
- .apply(new GenericMutableRow(bufferSchema.length))
+ .apply(new GenericInternalRow(bufferSchema.length))
// Initialize declarative aggregates' buffer values
expressionAggInitialProjection.target(buffer)(EmptyRow)
// Initialize imperative aggregates' buffer values
@@ -127,7 +127,7 @@ class TungstenAggregationIterator(
}
// Creates a function used to generate output rows.
- override protected def generateResultProjection(): (UnsafeRow, MutableRow) => UnsafeRow = {
+ override protected def generateResultProjection(): (UnsafeRow, InternalRow) => UnsafeRow = {
val modes = aggregateExpressions.map(_.mode).distinct
if (modes.nonEmpty && !modes.contains(Final) && !modes.contains(Complete)) {
// Fast path for partial aggregation, UnsafeRowJoiner is usually faster than projection
@@ -137,7 +137,7 @@ class TungstenAggregationIterator(
val bufferSchema = StructType.fromAttributes(bufferAttributes)
val unsafeRowJoiner = GenerateUnsafeRowJoiner.create(groupingKeySchema, bufferSchema)
- (currentGroupingKey: UnsafeRow, currentBuffer: MutableRow) => {
+ (currentGroupingKey: UnsafeRow, currentBuffer: InternalRow) => {
unsafeRowJoiner.join(currentGroupingKey, currentBuffer.asInstanceOf[UnsafeRow])
}
} else {
@@ -300,7 +300,7 @@ class TungstenAggregationIterator(
private[this] val sortBasedAggregationBuffer: UnsafeRow = createNewAggregationBuffer()
// The function used to process rows in a group
- private[this] var sortBasedProcessRow: (MutableRow, InternalRow) => Unit = null
+ private[this] var sortBasedProcessRow: (InternalRow, InternalRow) => Unit = null
// Processes rows in the current group. It will stop when it find a new group.
private def processCurrentSortedGroup(): Unit = {
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
index 586e145..67760f3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.aggregate
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, MutableRow, _}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, _}
import org.apache.spark.sql.catalyst.expressions.aggregate.ImperativeAggregate
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
@@ -96,18 +96,18 @@ sealed trait BufferSetterGetterUtils {
getters
}
- def createSetters(schema: StructType): Array[((MutableRow, Int, Any) => Unit)] = {
+ def createSetters(schema: StructType): Array[((InternalRow, Int, Any) => Unit)] = {
val dataTypes = schema.fields.map(_.dataType)
- val setters = new Array[(MutableRow, Int, Any) => Unit](dataTypes.length)
+ val setters = new Array[(InternalRow, Int, Any) => Unit](dataTypes.length)
var i = 0
while (i < setters.length) {
setters(i) = dataTypes(i) match {
case NullType =>
- (row: MutableRow, ordinal: Int, value: Any) => row.setNullAt(ordinal)
+ (row: InternalRow, ordinal: Int, value: Any) => row.setNullAt(ordinal)
case b: BooleanType =>
- (row: MutableRow, ordinal: Int, value: Any) =>
+ (row: InternalRow, ordinal: Int, value: Any) =>
if (value != null) {
row.setBoolean(ordinal, value.asInstanceOf[Boolean])
} else {
@@ -115,7 +115,7 @@ sealed trait BufferSetterGetterUtils {
}
case ByteType =>
- (row: MutableRow, ordinal: Int, value: Any) =>
+ (row: InternalRow, ordinal: Int, value: Any) =>
if (value != null) {
row.setByte(ordinal, value.asInstanceOf[Byte])
} else {
@@ -123,7 +123,7 @@ sealed trait BufferSetterGetterUtils {
}
case ShortType =>
- (row: MutableRow, ordinal: Int, value: Any) =>
+ (row: InternalRow, ordinal: Int, value: Any) =>
if (value != null) {
row.setShort(ordinal, value.asInstanceOf[Short])
} else {
@@ -131,7 +131,7 @@ sealed trait BufferSetterGetterUtils {
}
case IntegerType =>
- (row: MutableRow, ordinal: Int, value: Any) =>
+ (row: InternalRow, ordinal: Int, value: Any) =>
if (value != null) {
row.setInt(ordinal, value.asInstanceOf[Int])
} else {
@@ -139,7 +139,7 @@ sealed trait BufferSetterGetterUtils {
}
case LongType =>
- (row: MutableRow, ordinal: Int, value: Any) =>
+ (row: InternalRow, ordinal: Int, value: Any) =>
if (value != null) {
row.setLong(ordinal, value.asInstanceOf[Long])
} else {
@@ -147,7 +147,7 @@ sealed trait BufferSetterGetterUtils {
}
case FloatType =>
- (row: MutableRow, ordinal: Int, value: Any) =>
+ (row: InternalRow, ordinal: Int, value: Any) =>
if (value != null) {
row.setFloat(ordinal, value.asInstanceOf[Float])
} else {
@@ -155,7 +155,7 @@ sealed trait BufferSetterGetterUtils {
}
case DoubleType =>
- (row: MutableRow, ordinal: Int, value: Any) =>
+ (row: InternalRow, ordinal: Int, value: Any) =>
if (value != null) {
row.setDouble(ordinal, value.asInstanceOf[Double])
} else {
@@ -164,13 +164,13 @@ sealed trait BufferSetterGetterUtils {
case dt: DecimalType =>
val precision = dt.precision
- (row: MutableRow, ordinal: Int, value: Any) =>
+ (row: InternalRow, ordinal: Int, value: Any) =>
// To make it work with UnsafeRow, we cannot use setNullAt.
// Please see the comment of UnsafeRow's setDecimal.
row.setDecimal(ordinal, value.asInstanceOf[Decimal], precision)
case DateType =>
- (row: MutableRow, ordinal: Int, value: Any) =>
+ (row: InternalRow, ordinal: Int, value: Any) =>
if (value != null) {
row.setInt(ordinal, value.asInstanceOf[Int])
} else {
@@ -178,7 +178,7 @@ sealed trait BufferSetterGetterUtils {
}
case TimestampType =>
- (row: MutableRow, ordinal: Int, value: Any) =>
+ (row: InternalRow, ordinal: Int, value: Any) =>
if (value != null) {
row.setLong(ordinal, value.asInstanceOf[Long])
} else {
@@ -186,7 +186,7 @@ sealed trait BufferSetterGetterUtils {
}
case other =>
- (row: MutableRow, ordinal: Int, value: Any) =>
+ (row: InternalRow, ordinal: Int, value: Any) =>
if (value != null) {
row.update(ordinal, value)
} else {
@@ -209,7 +209,7 @@ private[aggregate] class MutableAggregationBufferImpl(
toCatalystConverters: Array[Any => Any],
toScalaConverters: Array[Any => Any],
bufferOffset: Int,
- var underlyingBuffer: MutableRow)
+ var underlyingBuffer: InternalRow)
extends MutableAggregationBuffer with BufferSetterGetterUtils {
private[this] val offsets: Array[Int] = {
@@ -413,13 +413,13 @@ case class ScalaUDAF(
null)
}
- override def initialize(buffer: MutableRow): Unit = {
+ override def initialize(buffer: InternalRow): Unit = {
mutableAggregateBuffer.underlyingBuffer = buffer
udaf.initialize(mutableAggregateBuffer)
}
- override def update(buffer: MutableRow, input: InternalRow): Unit = {
+ override def update(buffer: InternalRow, input: InternalRow): Unit = {
mutableAggregateBuffer.underlyingBuffer = buffer
udaf.update(
@@ -427,7 +427,7 @@ case class ScalaUDAF(
inputToScalaConverters(inputProjection(input)).asInstanceOf[Row])
}
- override def merge(buffer1: MutableRow, buffer2: InternalRow): Unit = {
+ override def merge(buffer1: InternalRow, buffer2: InternalRow): Unit = {
mutableAggregateBuffer.underlyingBuffer = buffer1
inputAggregateBuffer.underlyingInputBuffer = buffer2
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala
index 7cde04b..6241b79 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala
@@ -21,15 +21,16 @@ import java.nio.{ByteBuffer, ByteOrder}
import scala.annotation.tailrec
-import org.apache.spark.sql.catalyst.expressions.{MutableRow, UnsafeArrayData, UnsafeMapData, UnsafeRow}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{UnsafeArrayData, UnsafeMapData, UnsafeRow}
import org.apache.spark.sql.execution.columnar.compression.CompressibleColumnAccessor
import org.apache.spark.sql.types._
/**
* An `Iterator` like trait used to extract values from columnar byte buffer. When a value is
* extracted from the buffer, instead of directly returning it, the value is set into some field of
- * a [[MutableRow]]. In this way, boxing cost can be avoided by leveraging the setter methods
- * for primitive values provided by [[MutableRow]].
+ * a [[InternalRow]]. In this way, boxing cost can be avoided by leveraging the setter methods
+ * for primitive values provided by [[InternalRow]].
*/
private[columnar] trait ColumnAccessor {
initialize()
@@ -38,7 +39,7 @@ private[columnar] trait ColumnAccessor {
def hasNext: Boolean
- def extractTo(row: MutableRow, ordinal: Int): Unit
+ def extractTo(row: InternalRow, ordinal: Int): Unit
protected def underlyingBuffer: ByteBuffer
}
@@ -52,11 +53,11 @@ private[columnar] abstract class BasicColumnAccessor[JvmType](
override def hasNext: Boolean = buffer.hasRemaining
- override def extractTo(row: MutableRow, ordinal: Int): Unit = {
+ override def extractTo(row: InternalRow, ordinal: Int): Unit = {
extractSingle(row, ordinal)
}
- def extractSingle(row: MutableRow, ordinal: Int): Unit = {
+ def extractSingle(row: InternalRow, ordinal: Int): Unit = {
columnType.extract(buffer, row, ordinal)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala
index d27d8c3..703bde2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala
@@ -92,7 +92,7 @@ private[columnar] sealed abstract class ColumnType[JvmType] {
* `row(ordinal)`. Subclasses should override this method to avoid boxing/unboxing costs whenever
* possible.
*/
- def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
+ def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = {
setField(row, ordinal, extract(buffer))
}
@@ -125,13 +125,13 @@ private[columnar] sealed abstract class ColumnType[JvmType] {
* Sets `row(ordinal)` to `field`. Subclasses should override this method to avoid boxing/unboxing
* costs whenever possible.
*/
- def setField(row: MutableRow, ordinal: Int, value: JvmType): Unit
+ def setField(row: InternalRow, ordinal: Int, value: JvmType): Unit
/**
* Copies `from(fromOrdinal)` to `to(toOrdinal)`. Subclasses should override this method to avoid
* boxing/unboxing costs whenever possible.
*/
- def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = {
+ def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int): Unit = {
setField(to, toOrdinal, getField(from, fromOrdinal))
}
@@ -149,7 +149,7 @@ private[columnar] object NULL extends ColumnType[Any] {
override def defaultSize: Int = 0
override def append(v: Any, buffer: ByteBuffer): Unit = {}
override def extract(buffer: ByteBuffer): Any = null
- override def setField(row: MutableRow, ordinal: Int, value: Any): Unit = row.setNullAt(ordinal)
+ override def setField(row: InternalRow, ordinal: Int, value: Any): Unit = row.setNullAt(ordinal)
override def getField(row: InternalRow, ordinal: Int): Any = null
}
@@ -177,18 +177,18 @@ private[columnar] object INT extends NativeColumnType(IntegerType, 4) {
ByteBufferHelper.getInt(buffer)
}
- override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
+ override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = {
row.setInt(ordinal, ByteBufferHelper.getInt(buffer))
}
- override def setField(row: MutableRow, ordinal: Int, value: Int): Unit = {
+ override def setField(row: InternalRow, ordinal: Int, value: Int): Unit = {
row.setInt(ordinal, value)
}
override def getField(row: InternalRow, ordinal: Int): Int = row.getInt(ordinal)
- override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
+ override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) {
to.setInt(toOrdinal, from.getInt(fromOrdinal))
}
}
@@ -206,17 +206,17 @@ private[columnar] object LONG extends NativeColumnType(LongType, 8) {
ByteBufferHelper.getLong(buffer)
}
- override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
+ override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = {
row.setLong(ordinal, ByteBufferHelper.getLong(buffer))
}
- override def setField(row: MutableRow, ordinal: Int, value: Long): Unit = {
+ override def setField(row: InternalRow, ordinal: Int, value: Long): Unit = {
row.setLong(ordinal, value)
}
override def getField(row: InternalRow, ordinal: Int): Long = row.getLong(ordinal)
- override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
+ override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) {
to.setLong(toOrdinal, from.getLong(fromOrdinal))
}
}
@@ -234,17 +234,17 @@ private[columnar] object FLOAT extends NativeColumnType(FloatType, 4) {
ByteBufferHelper.getFloat(buffer)
}
- override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
+ override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = {
row.setFloat(ordinal, ByteBufferHelper.getFloat(buffer))
}
- override def setField(row: MutableRow, ordinal: Int, value: Float): Unit = {
+ override def setField(row: InternalRow, ordinal: Int, value: Float): Unit = {
row.setFloat(ordinal, value)
}
override def getField(row: InternalRow, ordinal: Int): Float = row.getFloat(ordinal)
- override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
+ override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) {
to.setFloat(toOrdinal, from.getFloat(fromOrdinal))
}
}
@@ -262,17 +262,17 @@ private[columnar] object DOUBLE extends NativeColumnType(DoubleType, 8) {
ByteBufferHelper.getDouble(buffer)
}
- override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
+ override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = {
row.setDouble(ordinal, ByteBufferHelper.getDouble(buffer))
}
- override def setField(row: MutableRow, ordinal: Int, value: Double): Unit = {
+ override def setField(row: InternalRow, ordinal: Int, value: Double): Unit = {
row.setDouble(ordinal, value)
}
override def getField(row: InternalRow, ordinal: Int): Double = row.getDouble(ordinal)
- override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
+ override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) {
to.setDouble(toOrdinal, from.getDouble(fromOrdinal))
}
}
@@ -288,17 +288,17 @@ private[columnar] object BOOLEAN extends NativeColumnType(BooleanType, 1) {
override def extract(buffer: ByteBuffer): Boolean = buffer.get() == 1
- override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
+ override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = {
row.setBoolean(ordinal, buffer.get() == 1)
}
- override def setField(row: MutableRow, ordinal: Int, value: Boolean): Unit = {
+ override def setField(row: InternalRow, ordinal: Int, value: Boolean): Unit = {
row.setBoolean(ordinal, value)
}
override def getField(row: InternalRow, ordinal: Int): Boolean = row.getBoolean(ordinal)
- override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
+ override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) {
to.setBoolean(toOrdinal, from.getBoolean(fromOrdinal))
}
}
@@ -316,17 +316,17 @@ private[columnar] object BYTE extends NativeColumnType(ByteType, 1) {
buffer.get()
}
- override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
+ override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = {
row.setByte(ordinal, buffer.get())
}
- override def setField(row: MutableRow, ordinal: Int, value: Byte): Unit = {
+ override def setField(row: InternalRow, ordinal: Int, value: Byte): Unit = {
row.setByte(ordinal, value)
}
override def getField(row: InternalRow, ordinal: Int): Byte = row.getByte(ordinal)
- override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
+ override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) {
to.setByte(toOrdinal, from.getByte(fromOrdinal))
}
}
@@ -344,17 +344,17 @@ private[columnar] object SHORT extends NativeColumnType(ShortType, 2) {
buffer.getShort()
}
- override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
+ override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = {
row.setShort(ordinal, buffer.getShort())
}
- override def setField(row: MutableRow, ordinal: Int, value: Short): Unit = {
+ override def setField(row: InternalRow, ordinal: Int, value: Short): Unit = {
row.setShort(ordinal, value)
}
override def getField(row: InternalRow, ordinal: Int): Short = row.getShort(ordinal)
- override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
+ override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) {
to.setShort(toOrdinal, from.getShort(fromOrdinal))
}
}
@@ -366,7 +366,7 @@ private[columnar] object SHORT extends NativeColumnType(ShortType, 2) {
private[columnar] trait DirectCopyColumnType[JvmType] extends ColumnType[JvmType] {
// copy the bytes from ByteBuffer to UnsafeRow
- override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
+ override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = {
if (row.isInstanceOf[MutableUnsafeRow]) {
val numBytes = buffer.getInt
val cursor = buffer.position()
@@ -407,7 +407,7 @@ private[columnar] object STRING
UTF8String.fromBytes(buffer.array(), buffer.arrayOffset() + cursor, length)
}
- override def setField(row: MutableRow, ordinal: Int, value: UTF8String): Unit = {
+ override def setField(row: InternalRow, ordinal: Int, value: UTF8String): Unit = {
if (row.isInstanceOf[MutableUnsafeRow]) {
row.asInstanceOf[MutableUnsafeRow].writer.write(ordinal, value)
} else {
@@ -419,7 +419,7 @@ private[columnar] object STRING
row.getUTF8String(ordinal)
}
- override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
+ override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) {
setField(to, toOrdinal, getField(from, fromOrdinal))
}
@@ -433,7 +433,7 @@ private[columnar] case class COMPACT_DECIMAL(precision: Int, scale: Int)
Decimal(ByteBufferHelper.getLong(buffer), precision, scale)
}
- override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
+ override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = {
if (row.isInstanceOf[MutableUnsafeRow]) {
// copy it as Long
row.setLong(ordinal, ByteBufferHelper.getLong(buffer))
@@ -459,11 +459,11 @@ private[columnar] case class COMPACT_DECIMAL(precision: Int, scale: Int)
row.getDecimal(ordinal, precision, scale)
}
- override def setField(row: MutableRow, ordinal: Int, value: Decimal): Unit = {
+ override def setField(row: InternalRow, ordinal: Int, value: Decimal): Unit = {
row.setDecimal(ordinal, value, precision)
}
- override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
+ override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) {
setField(to, toOrdinal, getField(from, fromOrdinal))
}
}
@@ -497,7 +497,7 @@ private[columnar] object BINARY extends ByteArrayColumnType[Array[Byte]](16) {
def dataType: DataType = BinaryType
- override def setField(row: MutableRow, ordinal: Int, value: Array[Byte]): Unit = {
+ override def setField(row: InternalRow, ordinal: Int, value: Array[Byte]): Unit = {
row.update(ordinal, value)
}
@@ -522,7 +522,7 @@ private[columnar] case class LARGE_DECIMAL(precision: Int, scale: Int)
row.getDecimal(ordinal, precision, scale)
}
- override def setField(row: MutableRow, ordinal: Int, value: Decimal): Unit = {
+ override def setField(row: InternalRow, ordinal: Int, value: Decimal): Unit = {
row.setDecimal(ordinal, value, precision)
}
@@ -553,7 +553,7 @@ private[columnar] case class STRUCT(dataType: StructType)
override def defaultSize: Int = 20
- override def setField(row: MutableRow, ordinal: Int, value: UnsafeRow): Unit = {
+ override def setField(row: InternalRow, ordinal: Int, value: UnsafeRow): Unit = {
row.update(ordinal, value)
}
@@ -591,7 +591,7 @@ private[columnar] case class ARRAY(dataType: ArrayType)
override def defaultSize: Int = 28
- override def setField(row: MutableRow, ordinal: Int, value: UnsafeArrayData): Unit = {
+ override def setField(row: InternalRow, ordinal: Int, value: UnsafeArrayData): Unit = {
row.update(ordinal, value)
}
@@ -630,7 +630,7 @@ private[columnar] case class MAP(dataType: MapType)
override def defaultSize: Int = 68
- override def setField(row: MutableRow, ordinal: Int, value: UnsafeMapData): Unit = {
+ override def setField(row: InternalRow, ordinal: Int, value: UnsafeMapData): Unit = {
row.update(ordinal, value)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
index 96bd338..14024d6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
@@ -36,8 +36,7 @@ abstract class ColumnarIterator extends Iterator[InternalRow] {
*
* WARNING: These setter MUST be called in increasing order of ordinals.
*/
-class MutableUnsafeRow(val writer: UnsafeRowWriter) extends GenericMutableRow(null) {
-
+class MutableUnsafeRow(val writer: UnsafeRowWriter) extends BaseGenericInternalRow {
override def isNullAt(i: Int): Boolean = writer.isNullAt(i)
override def setNullAt(i: Int): Unit = writer.setNullAt(i)
@@ -55,6 +54,9 @@ class MutableUnsafeRow(val writer: UnsafeRowWriter) extends GenericMutableRow(nu
override def update(i: Int, v: Any): Unit = throw new UnsupportedOperationException
// all other methods inherited from GenericMutableRow are not need
+ override protected def genericGet(ordinal: Int): Any = throw new UnsupportedOperationException
+ override def numFields: Int = throw new UnsupportedOperationException
+ override def copy(): InternalRow = throw new UnsupportedOperationException
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala
index 2465633..2f09757 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.columnar
import java.nio.{ByteBuffer, ByteOrder}
-import org.apache.spark.sql.catalyst.expressions.MutableRow
+import org.apache.spark.sql.catalyst.InternalRow
private[columnar] trait NullableColumnAccessor extends ColumnAccessor {
private var nullsBuffer: ByteBuffer = _
@@ -39,7 +39,7 @@ private[columnar] trait NullableColumnAccessor extends ColumnAccessor {
super.initialize()
}
- abstract override def extractTo(row: MutableRow, ordinal: Int): Unit = {
+ abstract override def extractTo(row: InternalRow, ordinal: Int): Unit = {
if (pos == nextNullIndex) {
seenNulls += 1
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala
index 6579b50..e1d13ad 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution.columnar.compression
-import org.apache.spark.sql.catalyst.expressions.MutableRow
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.columnar.{ColumnAccessor, NativeColumnAccessor}
import org.apache.spark.sql.types.AtomicType
@@ -33,7 +33,7 @@ private[columnar] trait CompressibleColumnAccessor[T <: AtomicType] extends Colu
abstract override def hasNext: Boolean = super.hasNext || decoder.hasNext
- override def extractSingle(row: MutableRow, ordinal: Int): Unit = {
+ override def extractSingle(row: InternalRow, ordinal: Int): Unit = {
decoder.next(row, ordinal)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala
index b90d00b..6e4f1c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.columnar.compression
import java.nio.{ByteBuffer, ByteOrder}
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.MutableRow
import org.apache.spark.sql.execution.columnar.{ColumnType, NativeColumnType}
import org.apache.spark.sql.types.AtomicType
@@ -39,7 +38,7 @@ private[columnar] trait Encoder[T <: AtomicType] {
}
private[columnar] trait Decoder[T <: AtomicType] {
- def next(row: MutableRow, ordinal: Int): Unit
+ def next(row: InternalRow, ordinal: Int): Unit
def hasNext: Boolean
}
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
index 941f03b..ee99c90 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer
import scala.collection.mutable
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{MutableRow, SpecificMutableRow}
+import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
import org.apache.spark.sql.execution.columnar._
import org.apache.spark.sql.types._
@@ -56,7 +56,7 @@ private[columnar] case object PassThrough extends CompressionScheme {
class Decoder[T <: AtomicType](buffer: ByteBuffer, columnType: NativeColumnType[T])
extends compression.Decoder[T] {
- override def next(row: MutableRow, ordinal: Int): Unit = {
+ override def next(row: InternalRow, ordinal: Int): Unit = {
columnType.extract(buffer, row, ordinal)
}
@@ -86,7 +86,7 @@ private[columnar] case object RunLengthEncoding extends CompressionScheme {
private var _compressedSize = 0
// Using `MutableRow` to store the last value to avoid boxing/unboxing cost.
- private val lastValue = new SpecificMutableRow(Seq(columnType.dataType))
+ private val lastValue = new SpecificInternalRow(Seq(columnType.dataType))
private var lastRun = 0
override def uncompressedSize: Int = _uncompressedSize
@@ -117,9 +117,9 @@ private[columnar] case object RunLengthEncoding extends CompressionScheme {
to.putInt(RunLengthEncoding.typeId)
if (from.hasRemaining) {
- val currentValue = new SpecificMutableRow(Seq(columnType.dataType))
+ val currentValue = new SpecificInternalRow(Seq(columnType.dataType))
var currentRun = 1
- val value = new SpecificMutableRow(Seq(columnType.dataType))
+ val value = new SpecificInternalRow(Seq(columnType.dataType))
columnType.extract(from, currentValue, 0)
@@ -156,7 +156,7 @@ private[columnar] case object RunLengthEncoding extends CompressionScheme {
private var valueCount = 0
private var currentValue: T#InternalType = _
- override def next(row: MutableRow, ordinal: Int): Unit = {
+ override def next(row: InternalRow, ordinal: Int): Unit = {
if (valueCount == run) {
currentValue = columnType.extract(buffer)
run = ByteBufferHelper.getInt(buffer)
@@ -273,7 +273,7 @@ private[columnar] case object DictionaryEncoding extends CompressionScheme {
Array.fill[Any](elementNum)(columnType.extract(buffer).asInstanceOf[Any])
}
- override def next(row: MutableRow, ordinal: Int): Unit = {
+ override def next(row: InternalRow, ordinal: Int): Unit = {
columnType.setField(row, ordinal, dictionary(buffer.getShort()).asInstanceOf[T#InternalType])
}
@@ -356,7 +356,7 @@ private[columnar] case object BooleanBitSet extends CompressionScheme {
private var visited: Int = 0
- override def next(row: MutableRow, ordinal: Int): Unit = {
+ override def next(row: InternalRow, ordinal: Int): Unit = {
val bit = visited % BITS_PER_LONG
visited += 1
@@ -443,7 +443,7 @@ private[columnar] case object IntDelta extends CompressionScheme {
override def hasNext: Boolean = buffer.hasRemaining
- override def next(row: MutableRow, ordinal: Int): Unit = {
+ override def next(row: InternalRow, ordinal: Int): Unit = {
val delta = buffer.get()
prev = if (delta > Byte.MinValue) prev + delta else ByteBufferHelper.getInt(buffer)
row.setInt(ordinal, prev)
@@ -523,7 +523,7 @@ private[columnar] case object LongDelta extends CompressionScheme {
override def hasNext: Boolean = buffer.hasRemaining
- override def next(row: MutableRow, ordinal: Int): Unit = {
+ override def next(row: InternalRow, ordinal: Int): Unit = {
val delta = buffer.get()
prev = if (delta > Byte.MinValue) prev + delta else ByteBufferHelper.getLong(buffer)
row.setLong(ordinal, prev)
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 693b4c4..6f9ed50 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -273,7 +273,7 @@ object DataSourceStrategy extends Strategy with Logging {
// Get the bucket ID based on the bucketing values.
// Restriction: Bucket pruning works iff the bucketing column has one and only one column.
def getBucketId(bucketColumn: Attribute, numBuckets: Int, value: Any): Int = {
- val mutableRow = new SpecificMutableRow(Seq(bucketColumn.dataType))
+ val mutableRow = new SpecificInternalRow(Seq(bucketColumn.dataType))
mutableRow(0) = Cast(Literal(value), bucketColumn.dataType).eval(null)
val bucketIdGeneration = UnsafeProjection.create(
HashPartitioning(bucketColumn :: Nil, numBuckets).partitionIdExpression :: Nil,
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
index 33b170b..55cb26d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
@@ -29,7 +29,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory, PartitionedFile, WriterContainer}
import org.apache.spark.sql.types._
@@ -88,7 +88,7 @@ object CSVRelation extends Logging {
case (field, index) => safeRequiredIndices(safeRequiredFields.indexOf(field)) = index
}
val requiredSize = requiredFields.length
- val row = new GenericMutableRow(requiredSize)
+ val row = new GenericInternalRow(requiredSize)
(tokens: Array[String], numMalformedRows) => {
if (params.dropMalformed && schemaFields.length != tokens.length) {
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 66f2bad..4754963 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -30,7 +30,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
-import org.apache.spark.sql.catalyst.expressions.{MutableRow, SpecificMutableRow}
+import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
import org.apache.spark.sql.catalyst.util.{DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects, JdbcType}
import org.apache.spark.sql.types._
@@ -283,7 +283,7 @@ object JdbcUtils extends Logging {
new NextIterator[InternalRow] {
private[this] val rs = resultSet
private[this] val getters: Array[JDBCValueGetter] = makeGetters(schema)
- private[this] val mutableRow = new SpecificMutableRow(schema.fields.map(x => x.dataType))
+ private[this] val mutableRow = new SpecificInternalRow(schema.fields.map(x => x.dataType))
override protected def close(): Unit = {
try {
@@ -314,22 +314,22 @@ object JdbcUtils extends Logging {
// A `JDBCValueGetter` is responsible for getting a value from `ResultSet` into a field
// for `MutableRow`. The last argument `Int` means the index for the value to be set in
// the row and also used for the value in `ResultSet`.
- private type JDBCValueGetter = (ResultSet, MutableRow, Int) => Unit
+ private type JDBCValueGetter = (ResultSet, InternalRow, Int) => Unit
/**
* Creates `JDBCValueGetter`s according to [[StructType]], which can set
- * each value from `ResultSet` to each field of [[MutableRow]] correctly.
+ * each value from `ResultSet` to each field of [[InternalRow]] correctly.
*/
private def makeGetters(schema: StructType): Array[JDBCValueGetter] =
schema.fields.map(sf => makeGetter(sf.dataType, sf.metadata))
private def makeGetter(dt: DataType, metadata: Metadata): JDBCValueGetter = dt match {
case BooleanType =>
- (rs: ResultSet, row: MutableRow, pos: Int) =>
+ (rs: ResultSet, row: InternalRow, pos: Int) =>
row.setBoolean(pos, rs.getBoolean(pos + 1))
case DateType =>
- (rs: ResultSet, row: MutableRow, pos: Int) =>
+ (rs: ResultSet, row: InternalRow, pos: Int) =>
// DateTimeUtils.fromJavaDate does not handle null value, so we need to check it.
val dateVal = rs.getDate(pos + 1)
if (dateVal != null) {
@@ -347,25 +347,25 @@ object JdbcUtils extends Logging {
// retrieve it, you will get wrong result 199.99.
// So it is needed to set precision and scale for Decimal based on JDBC metadata.
case DecimalType.Fixed(p, s) =>
- (rs: ResultSet, row: MutableRow, pos: Int) =>
+ (rs: ResultSet, row: InternalRow, pos: Int) =>
val decimal =
nullSafeConvert[java.math.BigDecimal](rs.getBigDecimal(pos + 1), d => Decimal(d, p, s))
row.update(pos, decimal)
case DoubleType =>
- (rs: ResultSet, row: MutableRow, pos: Int) =>
+ (rs: ResultSet, row: InternalRow, pos: Int) =>
row.setDouble(pos, rs.getDouble(pos + 1))
case FloatType =>
- (rs: ResultSet, row: MutableRow, pos: Int) =>
+ (rs: ResultSet, row: InternalRow, pos: Int) =>
row.setFloat(pos, rs.getFloat(pos + 1))
case IntegerType =>
- (rs: ResultSet, row: MutableRow, pos: Int) =>
+ (rs: ResultSet, row: InternalRow, pos: Int) =>
row.setInt(pos, rs.getInt(pos + 1))
case LongType if metadata.contains("binarylong") =>
- (rs: ResultSet, row: MutableRow, pos: Int) =>
+ (rs: ResultSet, row: InternalRow, pos: Int) =>
val bytes = rs.getBytes(pos + 1)
var ans = 0L
var j = 0
@@ -376,20 +376,20 @@ object JdbcUtils extends Logging {
row.setLong(pos, ans)
case LongType =>
- (rs: ResultSet, row: MutableRow, pos: Int) =>
+ (rs: ResultSet, row: InternalRow, pos: Int) =>
row.setLong(pos, rs.getLong(pos + 1))
case ShortType =>
- (rs: ResultSet, row: MutableRow, pos: Int) =>
+ (rs: ResultSet, row: InternalRow, pos: Int) =>
row.setShort(pos, rs.getShort(pos + 1))
case StringType =>
- (rs: ResultSet, row: MutableRow, pos: Int) =>
+ (rs: ResultSet, row: InternalRow, pos: Int) =>
// TODO(davies): use getBytes for better performance, if the encoding is UTF-8
row.update(pos, UTF8String.fromString(rs.getString(pos + 1)))
case TimestampType =>
- (rs: ResultSet, row: MutableRow, pos: Int) =>
+ (rs: ResultSet, row: InternalRow, pos: Int) =>
val t = rs.getTimestamp(pos + 1)
if (t != null) {
row.setLong(pos, DateTimeUtils.fromJavaTimestamp(t))
@@ -398,7 +398,7 @@ object JdbcUtils extends Logging {
}
case BinaryType =>
- (rs: ResultSet, row: MutableRow, pos: Int) =>
+ (rs: ResultSet, row: InternalRow, pos: Int) =>
row.update(pos, rs.getBytes(pos + 1))
case ArrayType(et, _) =>
@@ -437,7 +437,7 @@ object JdbcUtils extends Logging {
case _ => (array: Object) => array.asInstanceOf[Array[Any]]
}
- (rs: ResultSet, row: MutableRow, pos: Int) =>
+ (rs: ResultSet, row: InternalRow, pos: Int) =>
val array = nullSafeConvert[Object](
rs.getArray(pos + 1).getArray,
array => new GenericArrayData(elementConversion.apply(array)))
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index 9ffc2b5..33dcf2f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -40,7 +40,7 @@ import org.apache.spark.unsafe.types.UTF8String
/**
* A [[ParentContainerUpdater]] is used by a Parquet converter to set converted values to some
* corresponding parent container. For example, a converter for a `StructType` field may set
- * converted values to a [[MutableRow]]; or a converter for array elements may append converted
+ * converted values to a [[InternalRow]]; or a converter for array elements may append converted
* values to an [[ArrayBuffer]].
*/
private[parquet] trait ParentContainerUpdater {
@@ -155,7 +155,7 @@ private[parquet] class ParquetRowConverter(
* Updater used together with field converters within a [[ParquetRowConverter]]. It propagates
* converted filed values to the `ordinal`-th cell in `currentRow`.
*/
- private final class RowUpdater(row: MutableRow, ordinal: Int) extends ParentContainerUpdater {
+ private final class RowUpdater(row: InternalRow, ordinal: Int) extends ParentContainerUpdater {
override def set(value: Any): Unit = row(ordinal) = value
override def setBoolean(value: Boolean): Unit = row.setBoolean(ordinal, value)
override def setByte(value: Byte): Unit = row.setByte(ordinal, value)
@@ -166,7 +166,7 @@ private[parquet] class ParquetRowConverter(
override def setFloat(value: Float): Unit = row.setFloat(ordinal, value)
}
- private val currentRow = new SpecificMutableRow(catalystType.map(_.dataType))
+ private val currentRow = new SpecificInternalRow(catalystType.map(_.dataType))
private val unsafeProjection = UnsafeProjection.create(catalystType)
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
index 43cdce7..bfe7e3d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
@@ -119,7 +119,7 @@ case class BroadcastNestedLoopJoinExec(
streamed.execute().mapPartitionsInternal { streamedIter =>
val buildRows = relation.value
val joinedRow = new JoinedRow
- val nulls = new GenericMutableRow(broadcast.output.size)
+ val nulls = new GenericInternalRow(broadcast.output.size)
// Returns an iterator to avoid copy the rows.
new Iterator[InternalRow] {
@@ -205,14 +205,14 @@ case class BroadcastNestedLoopJoinExec(
val joinedRow = new JoinedRow
if (condition.isDefined) {
- val resultRow = new GenericMutableRow(Array[Any](null))
+ val resultRow = new GenericInternalRow(Array[Any](null))
streamedIter.map { row =>
val result = buildRows.exists(r => boundCondition(joinedRow(row, r)))
resultRow.setBoolean(0, result)
joinedRow(row, resultRow)
}
} else {
- val resultRow = new GenericMutableRow(Array[Any](buildRows.nonEmpty))
+ val resultRow = new GenericInternalRow(Array[Any](buildRows.nonEmpty))
streamedIter.map { row =>
joinedRow(row, resultRow)
}
@@ -293,7 +293,7 @@ case class BroadcastNestedLoopJoinExec(
}
val notMatchedBroadcastRows: Seq[InternalRow] = {
- val nulls = new GenericMutableRow(streamed.output.size)
+ val nulls = new GenericInternalRow(streamed.output.size)
val buf: CompactBuffer[InternalRow] = new CompactBuffer()
val joinedRow = new JoinedRow
joinedRow.withLeft(nulls)
@@ -311,7 +311,7 @@ case class BroadcastNestedLoopJoinExec(
val matchedStreamRows = streamRdd.mapPartitionsInternal { streamedIter =>
val buildRows = relation.value
val joinedRow = new JoinedRow
- val nulls = new GenericMutableRow(broadcast.output.size)
+ val nulls = new GenericInternalRow(broadcast.output.size)
streamedIter.flatMap { streamedRow =>
var i = 0
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index fb6bfa7..8ddac19 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -192,7 +192,7 @@ trait HashJoin {
streamIter: Iterator[InternalRow],
hashedRelation: HashedRelation): Iterator[InternalRow] = {
val joinKeys = streamSideKeyGenerator()
- val result = new GenericMutableRow(Array[Any](null))
+ val result = new GenericInternalRow(Array[Any](null))
val joinedRow = new JoinedRow
streamIter.map { current =>
val key = joinKeys(current)
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
index 81b3e1d..ecf7cf2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
@@ -275,7 +275,7 @@ case class SortMergeJoinExec(
case j: ExistenceJoin =>
new RowIterator {
private[this] var currentLeftRow: InternalRow = _
- private[this] val result: MutableRow = new GenericMutableRow(Array[Any](null))
+ private[this] val result: InternalRow = new GenericInternalRow(Array[Any](null))
private[this] val smjScanner = new SortMergeJoinScanner(
createLeftKeyGenerator(),
createRightKeyGenerator(),
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
index c7e2671..2acc511 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
@@ -141,7 +141,7 @@ object ObjectOperator {
def serializeObjectToRow(serializer: Seq[Expression]): Any => UnsafeRow = {
val proj = GenerateUnsafeProjection.generate(serializer)
val objType = serializer.head.collect { case b: BoundReference => b.dataType }.head
- val objRow = new SpecificMutableRow(objType :: Nil)
+ val objRow = new SpecificInternalRow(objType :: Nil)
(o: Any) => {
objRow(0) = o
proj(objRow)
@@ -149,7 +149,7 @@ object ObjectOperator {
}
def wrapObjectToRow(objType: DataType): Any => InternalRow = {
- val outputRow = new SpecificMutableRow(objType :: Nil)
+ val outputRow = new SpecificInternalRow(objType :: Nil)
(o: Any) => {
outputRow(0) = o
outputRow
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
index f9d20ad..dcaf2c7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
@@ -147,7 +147,7 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi
.compute(inputIterator, context.partitionId(), context)
val unpickle = new Unpickler
- val mutableRow = new GenericMutableRow(1)
+ val mutableRow = new GenericInternalRow(1)
val joined = new JoinedRow
val resultType = if (udfs.length == 1) {
udfs.head.dataType
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
index 822f49e..c02b154 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.stat
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row}
-import org.apache.spark.sql.catalyst.expressions.{Cast, GenericMutableRow}
+import org.apache.spark.sql.catalyst.expressions.{Cast, GenericInternalRow}
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.catalyst.util.QuantileSummaries
import org.apache.spark.sql.functions._
@@ -186,7 +186,7 @@ object StatFunctions extends Logging {
require(columnSize < 1e4, s"The number of distinct values for $col2, can't " +
s"exceed 1e4. Currently $columnSize")
val table = counts.groupBy(_.get(0)).map { case (col1Item, rows) =>
- val countsRow = new GenericMutableRow(columnSize + 1)
+ val countsRow = new GenericInternalRow(columnSize + 1)
rows.foreach { (row: Row) =>
// row.get(0) is column 1
// row.get(1) is column 2
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala
index d3a46d0..c9f5d3b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala
@@ -123,7 +123,7 @@ private[window] final class AggregateProcessor(
private[this] val join = new JoinedRow
private[this] val numImperatives = imperatives.length
- private[this] val buffer = new SpecificMutableRow(bufferSchema.toSeq.map(_.dataType))
+ private[this] val buffer = new SpecificInternalRow(bufferSchema.toSeq.map(_.dataType))
initialProjection.target(buffer)
updateProjection.target(buffer)
@@ -154,6 +154,6 @@ private[window] final class AggregateProcessor(
}
/** Evaluate buffer. */
- def evaluate(target: MutableRow): Unit =
+ def evaluate(target: InternalRow): Unit =
evaluateProjection.target(target)(buffer)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
index 7a6a30f..1dd281e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
@@ -204,7 +204,7 @@ case class WindowExec(
val factory = key match {
// Offset Frame
case ("OFFSET", RowFrame, Some(offset), Some(h)) if offset == h =>
- target: MutableRow =>
+ target: InternalRow =>
new OffsetWindowFunctionFrame(
target,
ordinal,
@@ -217,7 +217,7 @@ case class WindowExec(
// Growing Frame.
case ("AGGREGATE", frameType, None, Some(high)) =>
- target: MutableRow => {
+ target: InternalRow => {
new UnboundedPrecedingWindowFunctionFrame(
target,
processor,
@@ -226,7 +226,7 @@ case class WindowExec(
// Shrinking Frame.
case ("AGGREGATE", frameType, Some(low), None) =>
- target: MutableRow => {
+ target: InternalRow => {
new UnboundedFollowingWindowFunctionFrame(
target,
processor,
@@ -235,7 +235,7 @@ case class WindowExec(
// Moving Frame.
case ("AGGREGATE", frameType, Some(low), Some(high)) =>
- target: MutableRow => {
+ target: InternalRow => {
new SlidingWindowFunctionFrame(
target,
processor,
@@ -245,7 +245,7 @@ case class WindowExec(
// Entire Partition Frame.
case ("AGGREGATE", frameType, None, None) =>
- target: MutableRow => {
+ target: InternalRow => {
new UnboundedWindowFunctionFrame(target, processor)
}
}
@@ -312,7 +312,7 @@ case class WindowExec(
val inputFields = child.output.length
var sorter: UnsafeExternalSorter = null
var rowBuffer: RowBuffer = null
- val windowFunctionResult = new SpecificMutableRow(expressions.map(_.dataType))
+ val windowFunctionResult = new SpecificInternalRow(expressions.map(_.dataType))
val frames = factories.map(_(windowFunctionResult))
val numFrames = frames.length
private[this] def fetchNextPartition() {
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
index 2ab9faa..70efc0f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
@@ -56,7 +56,7 @@ private[window] abstract class WindowFunctionFrame {
* @param offset by which rows get moved within a partition.
*/
private[window] final class OffsetWindowFunctionFrame(
- target: MutableRow,
+ target: InternalRow,
ordinal: Int,
expressions: Array[OffsetWindowFunction],
inputSchema: Seq[Attribute],
@@ -136,7 +136,7 @@ private[window] final class OffsetWindowFunctionFrame(
* @param ubound comparator used to identify the upper bound of an output row.
*/
private[window] final class SlidingWindowFunctionFrame(
- target: MutableRow,
+ target: InternalRow,
processor: AggregateProcessor,
lbound: BoundOrdering,
ubound: BoundOrdering)
@@ -217,7 +217,7 @@ private[window] final class SlidingWindowFunctionFrame(
* @param processor to calculate the row values with.
*/
private[window] final class UnboundedWindowFunctionFrame(
- target: MutableRow,
+ target: InternalRow,
processor: AggregateProcessor)
extends WindowFunctionFrame {
@@ -255,7 +255,7 @@ private[window] final class UnboundedWindowFunctionFrame(
* @param ubound comparator used to identify the upper bound of an output row.
*/
private[window] final class UnboundedPrecedingWindowFunctionFrame(
- target: MutableRow,
+ target: InternalRow,
processor: AggregateProcessor,
ubound: BoundOrdering)
extends WindowFunctionFrame {
@@ -317,7 +317,7 @@ private[window] final class UnboundedPrecedingWindowFunctionFrame(
* @param lbound comparator used to identify the lower bound of an output row.
*/
private[window] final class UnboundedFollowingWindowFunctionFrame(
- target: MutableRow,
+ target: InternalRow,
processor: AggregateProcessor,
lbound: BoundOrdering)
extends WindowFunctionFrame {
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala
index 34936b3..7516be3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql
import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, SpecificMutableRow}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, SpecificInternalRow}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -27,7 +27,7 @@ class RowSuite extends SparkFunSuite with SharedSQLContext {
import testImplicits._
test("create row") {
- val expected = new GenericMutableRow(4)
+ val expected = new GenericInternalRow(4)
expected.setInt(0, 2147483647)
expected.update(1, UTF8String.fromString("this is a string"))
expected.setBoolean(2, false)
@@ -49,7 +49,7 @@ class RowSuite extends SparkFunSuite with SharedSQLContext {
}
test("SpecificMutableRow.update with null") {
- val row = new SpecificMutableRow(Seq(IntegerType))
+ val row = new SpecificInternalRow(Seq(IntegerType))
row(0) = null
assert(row.isNullAt(0))
}
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/TypedImperativeAggregateSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TypedImperativeAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/TypedImperativeAggregateSuite.scala
index b5eb16b..ffa26f1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/TypedImperativeAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/TypedImperativeAggregateSuite.scala
@@ -21,7 +21,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, Da
import org.apache.spark.sql.TypedImperativeAggregateSuite.TypedMax
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{BoundReference, Expression, GenericMutableRow, SpecificMutableRow}
+import org.apache.spark.sql.catalyst.expressions.{BoundReference, Expression, GenericInternalRow, SpecificInternalRow}
import org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate
import org.apache.spark.sql.execution.aggregate.SortAggregateExec
import org.apache.spark.sql.expressions.Window
@@ -64,7 +64,7 @@ class TypedImperativeAggregateSuite extends QueryTest with SharedSQLContext {
assert(agg.eval(mergeBuffer) == data.map(_._1).max)
// Tests low level eval(row: InternalRow) API.
- val row = new GenericMutableRow(Array(mergeBuffer): Array[Any])
+ val row = new GenericInternalRow(Array(mergeBuffer): Array[Any])
// Evaluates directly on row consist of aggregation buffer object.
assert(agg.eval(row) == data.map(_._1).max)
@@ -73,7 +73,7 @@ class TypedImperativeAggregateSuite extends QueryTest with SharedSQLContext {
test("supports SpecificMutableRow as mutable row") {
val aggregationBufferSchema = Seq(IntegerType, LongType, BinaryType, IntegerType)
val aggBufferOffset = 2
- val buffer = new SpecificMutableRow(aggregationBufferSchema)
+ val buffer = new SpecificInternalRow(aggregationBufferSchema)
val agg = new TypedMax(BoundReference(ordinal = 1, dataType = IntegerType, nullable = false))
.withNewMutableAggBufferOffset(aggBufferOffset)
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala
index 805b566..8bf9f52 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.CatalystTypeConverters
-import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection}
import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._
import org.apache.spark.sql.types._
@@ -54,7 +54,7 @@ class ColumnTypeSuite extends SparkFunSuite with Logging {
expected: Int): Unit = {
assertResult(expected, s"Wrong actualSize for $columnType") {
- val row = new GenericMutableRow(1)
+ val row = new GenericInternalRow(1)
row.update(0, CatalystTypeConverters.convertToCatalyst(value))
val proj = UnsafeProjection.create(Array[DataType](columnType.dataType))
columnType.actualSize(proj(row), 0)
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala
index 1529313..686c8fa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala
@@ -21,14 +21,14 @@ import scala.collection.immutable.HashSet
import scala.util.Random
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, GenericMutableRow}
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData}
import org.apache.spark.sql.types.{AtomicType, Decimal}
import org.apache.spark.unsafe.types.UTF8String
object ColumnarTestUtils {
- def makeNullRow(length: Int): GenericMutableRow = {
- val row = new GenericMutableRow(length)
+ def makeNullRow(length: Int): GenericInternalRow = {
+ val row = new GenericInternalRow(length)
(0 until length).foreach(row.setNullAt)
row
}
@@ -86,7 +86,7 @@ object ColumnarTestUtils {
tail: ColumnType[_]*): InternalRow = makeRandomRow(Seq(head) ++ tail)
def makeRandomRow(columnTypes: Seq[ColumnType[_]]): InternalRow = {
- val row = new GenericMutableRow(columnTypes.length)
+ val row = new GenericInternalRow(columnTypes.length)
makeRandomValues(columnTypes).zipWithIndex.foreach { case (value, index) =>
row(index) = value
}
@@ -95,11 +95,11 @@ object ColumnarTestUtils {
def makeUniqueValuesAndSingleValueRows[T <: AtomicType](
columnType: NativeColumnType[T],
- count: Int): (Seq[T#InternalType], Seq[GenericMutableRow]) = {
+ count: Int): (Seq[T#InternalType], Seq[GenericInternalRow]) = {
val values = makeUniqueRandomValues(columnType, count)
val rows = values.map { value =>
- val row = new GenericMutableRow(1)
+ val row = new GenericInternalRow(1)
row(0) = value
row
}
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessorSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessorSuite.scala
index dc22d3e..8f4ca3c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessorSuite.scala
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.CatalystTypeConverters
-import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection}
import org.apache.spark.sql.types._
class TestNullableColumnAccessor[JvmType](
@@ -72,7 +72,7 @@ class NullableColumnAccessorSuite extends SparkFunSuite {
}
val accessor = TestNullableColumnAccessor(builder.build(), columnType)
- val row = new GenericMutableRow(1)
+ val row = new GenericInternalRow(1)
val converter = CatalystTypeConverters.createToScalaConverter(columnType.dataType)
(0 until 4).foreach { _ =>
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilderSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilderSuite.scala
index cdd4551..b2b6e92 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilderSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilderSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.columnar
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.CatalystTypeConverters
-import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection}
import org.apache.spark.sql.types._
class TestNullableColumnBuilder[JvmType](columnType: ColumnType[JvmType])
@@ -94,7 +94,7 @@ class NullableColumnBuilderSuite extends SparkFunSuite {
(1 to 7 by 2).foreach(assertResult(_, "Wrong null position")(buffer.getInt()))
// For non-null values
- val actual = new GenericMutableRow(new Array[Any](1))
+ val actual = new GenericInternalRow(new Array[Any](1))
(0 until 4).foreach { _ =>
columnType.extract(buffer, actual, 0)
assert(converter(actual.get(0, dataType)) === converter(randomRow.get(0, dataType)),
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala
index f67e9c7..d01bf91 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.columnar.compression
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.execution.columnar.{BOOLEAN, NoopColumnStats}
import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._
@@ -72,7 +72,7 @@ class BooleanBitSetSuite extends SparkFunSuite {
buffer.rewind().position(headerSize + 4)
val decoder = BooleanBitSet.decoder(buffer, BOOLEAN)
- val mutableRow = new GenericMutableRow(1)
+ val mutableRow = new GenericInternalRow(1)
if (values.nonEmpty) {
values.foreach {
assert(decoder.hasNext)
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala
index babf944..9005ec9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala
@@ -23,7 +23,7 @@ import java.nio.charset.StandardCharsets
import org.apache.commons.lang3.RandomStringUtils
import org.apache.commons.math3.distribution.LogNormalDistribution
-import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, GenericMutableRow}
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.execution.columnar.{BOOLEAN, INT, LONG, NativeColumnType, SHORT, STRING}
import org.apache.spark.sql.types.AtomicType
import org.apache.spark.util.Benchmark
@@ -111,7 +111,7 @@ object CompressionSchemeBenchmark extends AllCompressionSchemes {
input.rewind()
benchmark.addCase(label)({ i: Int =>
- val rowBuf = new GenericMutableRow(1)
+ val rowBuf = new GenericInternalRow(1)
for (n <- 0L until iters) {
compressedBuf.rewind.position(4)
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala
index 830ca02..67139b1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.columnar.compression
import java.nio.ByteBuffer
import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.execution.columnar._
import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._
import org.apache.spark.sql.types.AtomicType
@@ -97,7 +97,7 @@ class DictionaryEncodingSuite extends SparkFunSuite {
buffer.rewind().position(headerSize + 4)
val decoder = DictionaryEncoding.decoder(buffer, columnType)
- val mutableRow = new GenericMutableRow(1)
+ val mutableRow = new GenericInternalRow(1)
if (inputSeq.nonEmpty) {
inputSeq.foreach { i =>
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala
index a530e27..411d31f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.columnar.compression
import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.execution.columnar._
import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._
import org.apache.spark.sql.types.IntegralType
@@ -48,7 +48,7 @@ class IntegralDeltaSuite extends SparkFunSuite {
}
input.foreach { value =>
- val row = new GenericMutableRow(1)
+ val row = new GenericInternalRow(1)
columnType.setField(row, 0, value)
builder.appendFrom(row, 0)
}
@@ -95,7 +95,7 @@ class IntegralDeltaSuite extends SparkFunSuite {
buffer.rewind().position(headerSize + 4)
val decoder = scheme.decoder(buffer, columnType)
- val mutableRow = new GenericMutableRow(1)
+ val mutableRow = new GenericInternalRow(1)
if (input.nonEmpty) {
input.foreach{
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala
index 95642e9..dffa9b3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.columnar.compression
import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.execution.columnar._
import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._
import org.apache.spark.sql.types.AtomicType
@@ -80,7 +80,7 @@ class RunLengthEncodingSuite extends SparkFunSuite {
buffer.rewind().position(headerSize + 4)
val decoder = RunLengthEncoding.decoder(buffer, columnType)
- val mutableRow = new GenericMutableRow(1)
+ val mutableRow = new GenericInternalRow(1)
if (inputSeq.nonEmpty) {
inputSeq.foreach { i =>
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 3161a63..580eade 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -38,7 +38,7 @@ import org.apache.parquet.schema.{MessageType, MessageTypeParser}
import org.apache.spark.SparkException
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection}
-import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
@@ -716,7 +716,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
dataTypes.zip(constantValues).foreach { case (dt, v) =>
val schema = StructType(StructField("pcol", dt) :: Nil)
val vectorizedReader = new VectorizedParquetRecordReader
- val partitionValues = new GenericMutableRow(Array(v))
+ val partitionValues = new GenericInternalRow(Array(v))
val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0)
try {
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 9dd8d9f..4c4a7d8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -24,7 +24,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
-import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow
+import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT, SingleElement}
import org.apache.spark.sql.internal.SQLConf
@@ -719,7 +719,7 @@ object TestingUDT {
.add("c", DoubleType, nullable = false)
override def serialize(n: NestedStruct): Any = {
- val row = new SpecificMutableRow(sqlType.asInstanceOf[StructType].map(_.dataType))
+ val row = new SpecificInternalRow(sqlType.asInstanceOf[StructType].map(_.dataType))
row.setInt(0, n.a)
row.setLong(1, n.b)
row.setDouble(2, n.c)
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
index fe34caa..1625116 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
@@ -688,25 +688,25 @@ private[hive] trait HiveInspectors {
* @return A function that performs in-place updating of a MutableRow.
* Use the overloaded ObjectInspector version for assignments.
*/
- def unwrapperFor(field: HiveStructField): (Any, MutableRow, Int) => Unit =
+ def unwrapperFor(field: HiveStructField): (Any, InternalRow, Int) => Unit =
field.getFieldObjectInspector match {
case oi: BooleanObjectInspector =>
- (value: Any, row: MutableRow, ordinal: Int) => row.setBoolean(ordinal, oi.get(value))
+ (value: Any, row: InternalRow, ordinal: Int) => row.setBoolean(ordinal, oi.get(value))
case oi: ByteObjectInspector =>
- (value: Any, row: MutableRow, ordinal: Int) => row.setByte(ordinal, oi.get(value))
+ (value: Any, row: InternalRow, ordinal: Int) => row.setByte(ordinal, oi.get(value))
case oi: ShortObjectInspector =>
- (value: Any, row: MutableRow, ordinal: Int) => row.setShort(ordinal, oi.get(value))
+ (value: Any, row: InternalRow, ordinal: Int) => row.setShort(ordinal, oi.get(value))
case oi: IntObjectInspector =>
- (value: Any, row: MutableRow, ordinal: Int) => row.setInt(ordinal, oi.get(value))
+ (value: Any, row: InternalRow, ordinal: Int) => row.setInt(ordinal, oi.get(value))
case oi: LongObjectInspector =>
- (value: Any, row: MutableRow, ordinal: Int) => row.setLong(ordinal, oi.get(value))
+ (value: Any, row: InternalRow, ordinal: Int) => row.setLong(ordinal, oi.get(value))
case oi: FloatObjectInspector =>
- (value: Any, row: MutableRow, ordinal: Int) => row.setFloat(ordinal, oi.get(value))
+ (value: Any, row: InternalRow, ordinal: Int) => row.setFloat(ordinal, oi.get(value))
case oi: DoubleObjectInspector =>
- (value: Any, row: MutableRow, ordinal: Int) => row.setDouble(ordinal, oi.get(value))
+ (value: Any, row: InternalRow, ordinal: Int) => row.setDouble(ordinal, oi.get(value))
case oi =>
val unwrapper = unwrapperFor(oi)
- (value: Any, row: MutableRow, ordinal: Int) => row(ordinal) = unwrapper(value)
+ (value: Any, row: InternalRow, ordinal: Int) => row(ordinal) = unwrapper(value)
}
def wrap(a: Any, oi: ObjectInspector, dataType: DataType): AnyRef = {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org