You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2016/10/07 21:03:52 UTC

[2/3] spark git commit: [SPARK-17761][SQL] Remove MutableRow

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
index 4e072a9..2988161 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
@@ -118,7 +118,7 @@ class TungstenAggregationIterator(
   private def createNewAggregationBuffer(): UnsafeRow = {
     val bufferSchema = aggregateFunctions.flatMap(_.aggBufferAttributes)
     val buffer: UnsafeRow = UnsafeProjection.create(bufferSchema.map(_.dataType))
-      .apply(new GenericMutableRow(bufferSchema.length))
+      .apply(new GenericInternalRow(bufferSchema.length))
     // Initialize declarative aggregates' buffer values
     expressionAggInitialProjection.target(buffer)(EmptyRow)
     // Initialize imperative aggregates' buffer values
@@ -127,7 +127,7 @@ class TungstenAggregationIterator(
   }
 
   // Creates a function used to generate output rows.
-  override protected def generateResultProjection(): (UnsafeRow, MutableRow) => UnsafeRow = {
+  override protected def generateResultProjection(): (UnsafeRow, InternalRow) => UnsafeRow = {
     val modes = aggregateExpressions.map(_.mode).distinct
     if (modes.nonEmpty && !modes.contains(Final) && !modes.contains(Complete)) {
       // Fast path for partial aggregation, UnsafeRowJoiner is usually faster than projection
@@ -137,7 +137,7 @@ class TungstenAggregationIterator(
       val bufferSchema = StructType.fromAttributes(bufferAttributes)
       val unsafeRowJoiner = GenerateUnsafeRowJoiner.create(groupingKeySchema, bufferSchema)
 
-      (currentGroupingKey: UnsafeRow, currentBuffer: MutableRow) => {
+      (currentGroupingKey: UnsafeRow, currentBuffer: InternalRow) => {
         unsafeRowJoiner.join(currentGroupingKey, currentBuffer.asInstanceOf[UnsafeRow])
       }
     } else {
@@ -300,7 +300,7 @@ class TungstenAggregationIterator(
   private[this] val sortBasedAggregationBuffer: UnsafeRow = createNewAggregationBuffer()
 
   // The function used to process rows in a group
-  private[this] var sortBasedProcessRow: (MutableRow, InternalRow) => Unit = null
+  private[this] var sortBasedProcessRow: (InternalRow, InternalRow) => Unit = null
 
   // Processes rows in the current group. It will stop when it find a new group.
   private def processCurrentSortedGroup(): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
index 586e145..67760f3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.aggregate
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, MutableRow, _}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, _}
 import org.apache.spark.sql.catalyst.expressions.aggregate.ImperativeAggregate
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
 import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
@@ -96,18 +96,18 @@ sealed trait BufferSetterGetterUtils {
     getters
   }
 
-  def createSetters(schema: StructType): Array[((MutableRow, Int, Any) => Unit)] = {
+  def createSetters(schema: StructType): Array[((InternalRow, Int, Any) => Unit)] = {
     val dataTypes = schema.fields.map(_.dataType)
-    val setters = new Array[(MutableRow, Int, Any) => Unit](dataTypes.length)
+    val setters = new Array[(InternalRow, Int, Any) => Unit](dataTypes.length)
 
     var i = 0
     while (i < setters.length) {
       setters(i) = dataTypes(i) match {
         case NullType =>
-          (row: MutableRow, ordinal: Int, value: Any) => row.setNullAt(ordinal)
+          (row: InternalRow, ordinal: Int, value: Any) => row.setNullAt(ordinal)
 
         case b: BooleanType =>
-          (row: MutableRow, ordinal: Int, value: Any) =>
+          (row: InternalRow, ordinal: Int, value: Any) =>
             if (value != null) {
               row.setBoolean(ordinal, value.asInstanceOf[Boolean])
             } else {
@@ -115,7 +115,7 @@ sealed trait BufferSetterGetterUtils {
             }
 
         case ByteType =>
-          (row: MutableRow, ordinal: Int, value: Any) =>
+          (row: InternalRow, ordinal: Int, value: Any) =>
             if (value != null) {
               row.setByte(ordinal, value.asInstanceOf[Byte])
             } else {
@@ -123,7 +123,7 @@ sealed trait BufferSetterGetterUtils {
             }
 
         case ShortType =>
-          (row: MutableRow, ordinal: Int, value: Any) =>
+          (row: InternalRow, ordinal: Int, value: Any) =>
             if (value != null) {
               row.setShort(ordinal, value.asInstanceOf[Short])
             } else {
@@ -131,7 +131,7 @@ sealed trait BufferSetterGetterUtils {
             }
 
         case IntegerType =>
-          (row: MutableRow, ordinal: Int, value: Any) =>
+          (row: InternalRow, ordinal: Int, value: Any) =>
             if (value != null) {
               row.setInt(ordinal, value.asInstanceOf[Int])
             } else {
@@ -139,7 +139,7 @@ sealed trait BufferSetterGetterUtils {
             }
 
         case LongType =>
-          (row: MutableRow, ordinal: Int, value: Any) =>
+          (row: InternalRow, ordinal: Int, value: Any) =>
             if (value != null) {
               row.setLong(ordinal, value.asInstanceOf[Long])
             } else {
@@ -147,7 +147,7 @@ sealed trait BufferSetterGetterUtils {
             }
 
         case FloatType =>
-          (row: MutableRow, ordinal: Int, value: Any) =>
+          (row: InternalRow, ordinal: Int, value: Any) =>
             if (value != null) {
               row.setFloat(ordinal, value.asInstanceOf[Float])
             } else {
@@ -155,7 +155,7 @@ sealed trait BufferSetterGetterUtils {
             }
 
         case DoubleType =>
-          (row: MutableRow, ordinal: Int, value: Any) =>
+          (row: InternalRow, ordinal: Int, value: Any) =>
             if (value != null) {
               row.setDouble(ordinal, value.asInstanceOf[Double])
             } else {
@@ -164,13 +164,13 @@ sealed trait BufferSetterGetterUtils {
 
         case dt: DecimalType =>
           val precision = dt.precision
-          (row: MutableRow, ordinal: Int, value: Any) =>
+          (row: InternalRow, ordinal: Int, value: Any) =>
             // To make it work with UnsafeRow, we cannot use setNullAt.
             // Please see the comment of UnsafeRow's setDecimal.
             row.setDecimal(ordinal, value.asInstanceOf[Decimal], precision)
 
         case DateType =>
-          (row: MutableRow, ordinal: Int, value: Any) =>
+          (row: InternalRow, ordinal: Int, value: Any) =>
             if (value != null) {
               row.setInt(ordinal, value.asInstanceOf[Int])
             } else {
@@ -178,7 +178,7 @@ sealed trait BufferSetterGetterUtils {
             }
 
         case TimestampType =>
-          (row: MutableRow, ordinal: Int, value: Any) =>
+          (row: InternalRow, ordinal: Int, value: Any) =>
             if (value != null) {
               row.setLong(ordinal, value.asInstanceOf[Long])
             } else {
@@ -186,7 +186,7 @@ sealed trait BufferSetterGetterUtils {
             }
 
         case other =>
-          (row: MutableRow, ordinal: Int, value: Any) =>
+          (row: InternalRow, ordinal: Int, value: Any) =>
             if (value != null) {
               row.update(ordinal, value)
             } else {
@@ -209,7 +209,7 @@ private[aggregate] class MutableAggregationBufferImpl(
     toCatalystConverters: Array[Any => Any],
     toScalaConverters: Array[Any => Any],
     bufferOffset: Int,
-    var underlyingBuffer: MutableRow)
+    var underlyingBuffer: InternalRow)
   extends MutableAggregationBuffer with BufferSetterGetterUtils {
 
   private[this] val offsets: Array[Int] = {
@@ -413,13 +413,13 @@ case class ScalaUDAF(
       null)
   }
 
-  override def initialize(buffer: MutableRow): Unit = {
+  override def initialize(buffer: InternalRow): Unit = {
     mutableAggregateBuffer.underlyingBuffer = buffer
 
     udaf.initialize(mutableAggregateBuffer)
   }
 
-  override def update(buffer: MutableRow, input: InternalRow): Unit = {
+  override def update(buffer: InternalRow, input: InternalRow): Unit = {
     mutableAggregateBuffer.underlyingBuffer = buffer
 
     udaf.update(
@@ -427,7 +427,7 @@ case class ScalaUDAF(
       inputToScalaConverters(inputProjection(input)).asInstanceOf[Row])
   }
 
-  override def merge(buffer1: MutableRow, buffer2: InternalRow): Unit = {
+  override def merge(buffer1: InternalRow, buffer2: InternalRow): Unit = {
     mutableAggregateBuffer.underlyingBuffer = buffer1
     inputAggregateBuffer.underlyingInputBuffer = buffer2
 

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala
index 7cde04b..6241b79 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala
@@ -21,15 +21,16 @@ import java.nio.{ByteBuffer, ByteOrder}
 
 import scala.annotation.tailrec
 
-import org.apache.spark.sql.catalyst.expressions.{MutableRow, UnsafeArrayData, UnsafeMapData, UnsafeRow}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{UnsafeArrayData, UnsafeMapData, UnsafeRow}
 import org.apache.spark.sql.execution.columnar.compression.CompressibleColumnAccessor
 import org.apache.spark.sql.types._
 
 /**
  * An `Iterator` like trait used to extract values from columnar byte buffer. When a value is
  * extracted from the buffer, instead of directly returning it, the value is set into some field of
- * a [[MutableRow]]. In this way, boxing cost can be avoided by leveraging the setter methods
- * for primitive values provided by [[MutableRow]].
+ * a [[InternalRow]]. In this way, boxing cost can be avoided by leveraging the setter methods
+ * for primitive values provided by [[InternalRow]].
  */
 private[columnar] trait ColumnAccessor {
   initialize()
@@ -38,7 +39,7 @@ private[columnar] trait ColumnAccessor {
 
   def hasNext: Boolean
 
-  def extractTo(row: MutableRow, ordinal: Int): Unit
+  def extractTo(row: InternalRow, ordinal: Int): Unit
 
   protected def underlyingBuffer: ByteBuffer
 }
@@ -52,11 +53,11 @@ private[columnar] abstract class BasicColumnAccessor[JvmType](
 
   override def hasNext: Boolean = buffer.hasRemaining
 
-  override def extractTo(row: MutableRow, ordinal: Int): Unit = {
+  override def extractTo(row: InternalRow, ordinal: Int): Unit = {
     extractSingle(row, ordinal)
   }
 
-  def extractSingle(row: MutableRow, ordinal: Int): Unit = {
+  def extractSingle(row: InternalRow, ordinal: Int): Unit = {
     columnType.extract(buffer, row, ordinal)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala
index d27d8c3..703bde2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala
@@ -92,7 +92,7 @@ private[columnar] sealed abstract class ColumnType[JvmType] {
    * `row(ordinal)`. Subclasses should override this method to avoid boxing/unboxing costs whenever
    * possible.
    */
-  def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
+  def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = {
     setField(row, ordinal, extract(buffer))
   }
 
@@ -125,13 +125,13 @@ private[columnar] sealed abstract class ColumnType[JvmType] {
    * Sets `row(ordinal)` to `field`. Subclasses should override this method to avoid boxing/unboxing
    * costs whenever possible.
    */
-  def setField(row: MutableRow, ordinal: Int, value: JvmType): Unit
+  def setField(row: InternalRow, ordinal: Int, value: JvmType): Unit
 
   /**
    * Copies `from(fromOrdinal)` to `to(toOrdinal)`. Subclasses should override this method to avoid
    * boxing/unboxing costs whenever possible.
    */
-  def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = {
+  def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int): Unit = {
     setField(to, toOrdinal, getField(from, fromOrdinal))
   }
 
@@ -149,7 +149,7 @@ private[columnar] object NULL extends ColumnType[Any] {
   override def defaultSize: Int = 0
   override def append(v: Any, buffer: ByteBuffer): Unit = {}
   override def extract(buffer: ByteBuffer): Any = null
-  override def setField(row: MutableRow, ordinal: Int, value: Any): Unit = row.setNullAt(ordinal)
+  override def setField(row: InternalRow, ordinal: Int, value: Any): Unit = row.setNullAt(ordinal)
   override def getField(row: InternalRow, ordinal: Int): Any = null
 }
 
@@ -177,18 +177,18 @@ private[columnar] object INT extends NativeColumnType(IntegerType, 4) {
     ByteBufferHelper.getInt(buffer)
   }
 
-  override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
+  override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = {
     row.setInt(ordinal, ByteBufferHelper.getInt(buffer))
   }
 
-  override def setField(row: MutableRow, ordinal: Int, value: Int): Unit = {
+  override def setField(row: InternalRow, ordinal: Int, value: Int): Unit = {
     row.setInt(ordinal, value)
   }
 
   override def getField(row: InternalRow, ordinal: Int): Int = row.getInt(ordinal)
 
 
-  override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
+  override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) {
     to.setInt(toOrdinal, from.getInt(fromOrdinal))
   }
 }
@@ -206,17 +206,17 @@ private[columnar] object LONG extends NativeColumnType(LongType, 8) {
     ByteBufferHelper.getLong(buffer)
   }
 
-  override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
+  override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = {
     row.setLong(ordinal, ByteBufferHelper.getLong(buffer))
   }
 
-  override def setField(row: MutableRow, ordinal: Int, value: Long): Unit = {
+  override def setField(row: InternalRow, ordinal: Int, value: Long): Unit = {
     row.setLong(ordinal, value)
   }
 
   override def getField(row: InternalRow, ordinal: Int): Long = row.getLong(ordinal)
 
-  override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
+  override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) {
     to.setLong(toOrdinal, from.getLong(fromOrdinal))
   }
 }
@@ -234,17 +234,17 @@ private[columnar] object FLOAT extends NativeColumnType(FloatType, 4) {
     ByteBufferHelper.getFloat(buffer)
   }
 
-  override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
+  override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = {
     row.setFloat(ordinal, ByteBufferHelper.getFloat(buffer))
   }
 
-  override def setField(row: MutableRow, ordinal: Int, value: Float): Unit = {
+  override def setField(row: InternalRow, ordinal: Int, value: Float): Unit = {
     row.setFloat(ordinal, value)
   }
 
   override def getField(row: InternalRow, ordinal: Int): Float = row.getFloat(ordinal)
 
-  override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
+  override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) {
     to.setFloat(toOrdinal, from.getFloat(fromOrdinal))
   }
 }
@@ -262,17 +262,17 @@ private[columnar] object DOUBLE extends NativeColumnType(DoubleType, 8) {
     ByteBufferHelper.getDouble(buffer)
   }
 
-  override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
+  override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = {
     row.setDouble(ordinal, ByteBufferHelper.getDouble(buffer))
   }
 
-  override def setField(row: MutableRow, ordinal: Int, value: Double): Unit = {
+  override def setField(row: InternalRow, ordinal: Int, value: Double): Unit = {
     row.setDouble(ordinal, value)
   }
 
   override def getField(row: InternalRow, ordinal: Int): Double = row.getDouble(ordinal)
 
-  override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
+  override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) {
     to.setDouble(toOrdinal, from.getDouble(fromOrdinal))
   }
 }
@@ -288,17 +288,17 @@ private[columnar] object BOOLEAN extends NativeColumnType(BooleanType, 1) {
 
   override def extract(buffer: ByteBuffer): Boolean = buffer.get() == 1
 
-  override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
+  override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = {
     row.setBoolean(ordinal, buffer.get() == 1)
   }
 
-  override def setField(row: MutableRow, ordinal: Int, value: Boolean): Unit = {
+  override def setField(row: InternalRow, ordinal: Int, value: Boolean): Unit = {
     row.setBoolean(ordinal, value)
   }
 
   override def getField(row: InternalRow, ordinal: Int): Boolean = row.getBoolean(ordinal)
 
-  override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
+  override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) {
     to.setBoolean(toOrdinal, from.getBoolean(fromOrdinal))
   }
 }
@@ -316,17 +316,17 @@ private[columnar] object BYTE extends NativeColumnType(ByteType, 1) {
     buffer.get()
   }
 
-  override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
+  override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = {
     row.setByte(ordinal, buffer.get())
   }
 
-  override def setField(row: MutableRow, ordinal: Int, value: Byte): Unit = {
+  override def setField(row: InternalRow, ordinal: Int, value: Byte): Unit = {
     row.setByte(ordinal, value)
   }
 
   override def getField(row: InternalRow, ordinal: Int): Byte = row.getByte(ordinal)
 
-  override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
+  override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) {
     to.setByte(toOrdinal, from.getByte(fromOrdinal))
   }
 }
@@ -344,17 +344,17 @@ private[columnar] object SHORT extends NativeColumnType(ShortType, 2) {
     buffer.getShort()
   }
 
-  override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
+  override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = {
     row.setShort(ordinal, buffer.getShort())
   }
 
-  override def setField(row: MutableRow, ordinal: Int, value: Short): Unit = {
+  override def setField(row: InternalRow, ordinal: Int, value: Short): Unit = {
     row.setShort(ordinal, value)
   }
 
   override def getField(row: InternalRow, ordinal: Int): Short = row.getShort(ordinal)
 
-  override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
+  override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) {
     to.setShort(toOrdinal, from.getShort(fromOrdinal))
   }
 }
@@ -366,7 +366,7 @@ private[columnar] object SHORT extends NativeColumnType(ShortType, 2) {
 private[columnar] trait DirectCopyColumnType[JvmType] extends ColumnType[JvmType] {
 
   // copy the bytes from ByteBuffer to UnsafeRow
-  override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
+  override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = {
     if (row.isInstanceOf[MutableUnsafeRow]) {
       val numBytes = buffer.getInt
       val cursor = buffer.position()
@@ -407,7 +407,7 @@ private[columnar] object STRING
     UTF8String.fromBytes(buffer.array(), buffer.arrayOffset() + cursor, length)
   }
 
-  override def setField(row: MutableRow, ordinal: Int, value: UTF8String): Unit = {
+  override def setField(row: InternalRow, ordinal: Int, value: UTF8String): Unit = {
     if (row.isInstanceOf[MutableUnsafeRow]) {
       row.asInstanceOf[MutableUnsafeRow].writer.write(ordinal, value)
     } else {
@@ -419,7 +419,7 @@ private[columnar] object STRING
     row.getUTF8String(ordinal)
   }
 
-  override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
+  override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) {
     setField(to, toOrdinal, getField(from, fromOrdinal))
   }
 
@@ -433,7 +433,7 @@ private[columnar] case class COMPACT_DECIMAL(precision: Int, scale: Int)
     Decimal(ByteBufferHelper.getLong(buffer), precision, scale)
   }
 
-  override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
+  override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = {
     if (row.isInstanceOf[MutableUnsafeRow]) {
       // copy it as Long
       row.setLong(ordinal, ByteBufferHelper.getLong(buffer))
@@ -459,11 +459,11 @@ private[columnar] case class COMPACT_DECIMAL(precision: Int, scale: Int)
     row.getDecimal(ordinal, precision, scale)
   }
 
-  override def setField(row: MutableRow, ordinal: Int, value: Decimal): Unit = {
+  override def setField(row: InternalRow, ordinal: Int, value: Decimal): Unit = {
     row.setDecimal(ordinal, value, precision)
   }
 
-  override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
+  override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) {
     setField(to, toOrdinal, getField(from, fromOrdinal))
   }
 }
@@ -497,7 +497,7 @@ private[columnar] object BINARY extends ByteArrayColumnType[Array[Byte]](16) {
 
   def dataType: DataType = BinaryType
 
-  override def setField(row: MutableRow, ordinal: Int, value: Array[Byte]): Unit = {
+  override def setField(row: InternalRow, ordinal: Int, value: Array[Byte]): Unit = {
     row.update(ordinal, value)
   }
 
@@ -522,7 +522,7 @@ private[columnar] case class LARGE_DECIMAL(precision: Int, scale: Int)
     row.getDecimal(ordinal, precision, scale)
   }
 
-  override def setField(row: MutableRow, ordinal: Int, value: Decimal): Unit = {
+  override def setField(row: InternalRow, ordinal: Int, value: Decimal): Unit = {
     row.setDecimal(ordinal, value, precision)
   }
 
@@ -553,7 +553,7 @@ private[columnar] case class STRUCT(dataType: StructType)
 
   override def defaultSize: Int = 20
 
-  override def setField(row: MutableRow, ordinal: Int, value: UnsafeRow): Unit = {
+  override def setField(row: InternalRow, ordinal: Int, value: UnsafeRow): Unit = {
     row.update(ordinal, value)
   }
 
@@ -591,7 +591,7 @@ private[columnar] case class ARRAY(dataType: ArrayType)
 
   override def defaultSize: Int = 28
 
-  override def setField(row: MutableRow, ordinal: Int, value: UnsafeArrayData): Unit = {
+  override def setField(row: InternalRow, ordinal: Int, value: UnsafeArrayData): Unit = {
     row.update(ordinal, value)
   }
 
@@ -630,7 +630,7 @@ private[columnar] case class MAP(dataType: MapType)
 
   override def defaultSize: Int = 68
 
-  override def setField(row: MutableRow, ordinal: Int, value: UnsafeMapData): Unit = {
+  override def setField(row: InternalRow, ordinal: Int, value: UnsafeMapData): Unit = {
     row.update(ordinal, value)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
index 96bd338..14024d6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
@@ -36,8 +36,7 @@ abstract class ColumnarIterator extends Iterator[InternalRow] {
  *
  * WARNING: These setter MUST be called in increasing order of ordinals.
  */
-class MutableUnsafeRow(val writer: UnsafeRowWriter) extends GenericMutableRow(null) {
-
+class MutableUnsafeRow(val writer: UnsafeRowWriter) extends BaseGenericInternalRow {
   override def isNullAt(i: Int): Boolean = writer.isNullAt(i)
   override def setNullAt(i: Int): Unit = writer.setNullAt(i)
 
@@ -55,6 +54,9 @@ class MutableUnsafeRow(val writer: UnsafeRowWriter) extends GenericMutableRow(nu
   override def update(i: Int, v: Any): Unit = throw new UnsupportedOperationException
 
   // all other methods inherited from GenericMutableRow are not need
+  override protected def genericGet(ordinal: Int): Any = throw new UnsupportedOperationException
+  override def numFields: Int = throw new UnsupportedOperationException
+  override def copy(): InternalRow = throw new UnsupportedOperationException
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala
index 2465633..2f09757 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.columnar
 
 import java.nio.{ByteBuffer, ByteOrder}
 
-import org.apache.spark.sql.catalyst.expressions.MutableRow
+import org.apache.spark.sql.catalyst.InternalRow
 
 private[columnar] trait NullableColumnAccessor extends ColumnAccessor {
   private var nullsBuffer: ByteBuffer = _
@@ -39,7 +39,7 @@ private[columnar] trait NullableColumnAccessor extends ColumnAccessor {
     super.initialize()
   }
 
-  abstract override def extractTo(row: MutableRow, ordinal: Int): Unit = {
+  abstract override def extractTo(row: InternalRow, ordinal: Int): Unit = {
     if (pos == nextNullIndex) {
       seenNulls += 1
 

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala
index 6579b50..e1d13ad 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.columnar.compression
 
-import org.apache.spark.sql.catalyst.expressions.MutableRow
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.columnar.{ColumnAccessor, NativeColumnAccessor}
 import org.apache.spark.sql.types.AtomicType
 
@@ -33,7 +33,7 @@ private[columnar] trait CompressibleColumnAccessor[T <: AtomicType] extends Colu
 
   abstract override def hasNext: Boolean = super.hasNext || decoder.hasNext
 
-  override def extractSingle(row: MutableRow, ordinal: Int): Unit = {
+  override def extractSingle(row: InternalRow, ordinal: Int): Unit = {
     decoder.next(row, ordinal)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala
index b90d00b..6e4f1c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.columnar.compression
 import java.nio.{ByteBuffer, ByteOrder}
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.MutableRow
 import org.apache.spark.sql.execution.columnar.{ColumnType, NativeColumnType}
 import org.apache.spark.sql.types.AtomicType
 
@@ -39,7 +38,7 @@ private[columnar] trait Encoder[T <: AtomicType] {
 }
 
 private[columnar] trait Decoder[T <: AtomicType] {
-  def next(row: MutableRow, ordinal: Int): Unit
+  def next(row: InternalRow, ordinal: Int): Unit
 
   def hasNext: Boolean
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
index 941f03b..ee99c90 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer
 import scala.collection.mutable
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{MutableRow, SpecificMutableRow}
+import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
 import org.apache.spark.sql.execution.columnar._
 import org.apache.spark.sql.types._
 
@@ -56,7 +56,7 @@ private[columnar] case object PassThrough extends CompressionScheme {
   class Decoder[T <: AtomicType](buffer: ByteBuffer, columnType: NativeColumnType[T])
     extends compression.Decoder[T] {
 
-    override def next(row: MutableRow, ordinal: Int): Unit = {
+    override def next(row: InternalRow, ordinal: Int): Unit = {
       columnType.extract(buffer, row, ordinal)
     }
 
@@ -86,7 +86,7 @@ private[columnar] case object RunLengthEncoding extends CompressionScheme {
     private var _compressedSize = 0
 
     // Using `MutableRow` to store the last value to avoid boxing/unboxing cost.
-    private val lastValue = new SpecificMutableRow(Seq(columnType.dataType))
+    private val lastValue = new SpecificInternalRow(Seq(columnType.dataType))
     private var lastRun = 0
 
     override def uncompressedSize: Int = _uncompressedSize
@@ -117,9 +117,9 @@ private[columnar] case object RunLengthEncoding extends CompressionScheme {
       to.putInt(RunLengthEncoding.typeId)
 
       if (from.hasRemaining) {
-        val currentValue = new SpecificMutableRow(Seq(columnType.dataType))
+        val currentValue = new SpecificInternalRow(Seq(columnType.dataType))
         var currentRun = 1
-        val value = new SpecificMutableRow(Seq(columnType.dataType))
+        val value = new SpecificInternalRow(Seq(columnType.dataType))
 
         columnType.extract(from, currentValue, 0)
 
@@ -156,7 +156,7 @@ private[columnar] case object RunLengthEncoding extends CompressionScheme {
     private var valueCount = 0
     private var currentValue: T#InternalType = _
 
-    override def next(row: MutableRow, ordinal: Int): Unit = {
+    override def next(row: InternalRow, ordinal: Int): Unit = {
       if (valueCount == run) {
         currentValue = columnType.extract(buffer)
         run = ByteBufferHelper.getInt(buffer)
@@ -273,7 +273,7 @@ private[columnar] case object DictionaryEncoding extends CompressionScheme {
       Array.fill[Any](elementNum)(columnType.extract(buffer).asInstanceOf[Any])
     }
 
-    override def next(row: MutableRow, ordinal: Int): Unit = {
+    override def next(row: InternalRow, ordinal: Int): Unit = {
       columnType.setField(row, ordinal, dictionary(buffer.getShort()).asInstanceOf[T#InternalType])
     }
 
@@ -356,7 +356,7 @@ private[columnar] case object BooleanBitSet extends CompressionScheme {
 
     private var visited: Int = 0
 
-    override def next(row: MutableRow, ordinal: Int): Unit = {
+    override def next(row: InternalRow, ordinal: Int): Unit = {
       val bit = visited % BITS_PER_LONG
 
       visited += 1
@@ -443,7 +443,7 @@ private[columnar] case object IntDelta extends CompressionScheme {
 
     override def hasNext: Boolean = buffer.hasRemaining
 
-    override def next(row: MutableRow, ordinal: Int): Unit = {
+    override def next(row: InternalRow, ordinal: Int): Unit = {
       val delta = buffer.get()
       prev = if (delta > Byte.MinValue) prev + delta else ByteBufferHelper.getInt(buffer)
       row.setInt(ordinal, prev)
@@ -523,7 +523,7 @@ private[columnar] case object LongDelta extends CompressionScheme {
 
     override def hasNext: Boolean = buffer.hasRemaining
 
-    override def next(row: MutableRow, ordinal: Int): Unit = {
+    override def next(row: InternalRow, ordinal: Int): Unit = {
       val delta = buffer.get()
       prev = if (delta > Byte.MinValue) prev + delta else ByteBufferHelper.getLong(buffer)
       row.setLong(ordinal, prev)

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 693b4c4..6f9ed50 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -273,7 +273,7 @@ object DataSourceStrategy extends Strategy with Logging {
   // Get the bucket ID based on the bucketing values.
   // Restriction: Bucket pruning works iff the bucketing column has one and only one column.
   def getBucketId(bucketColumn: Attribute, numBuckets: Int, value: Any): Int = {
-    val mutableRow = new SpecificMutableRow(Seq(bucketColumn.dataType))
+    val mutableRow = new SpecificInternalRow(Seq(bucketColumn.dataType))
     mutableRow(0) = Cast(Literal(value), bucketColumn.dataType).eval(null)
     val bucketIdGeneration = UnsafeProjection.create(
       HashPartitioning(bucketColumn :: Nil, numBuckets).partitionIdExpression :: Nil,

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
index 33b170b..55cb26d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
@@ -29,7 +29,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory, PartitionedFile, WriterContainer}
 import org.apache.spark.sql.types._
@@ -88,7 +88,7 @@ object CSVRelation extends Logging {
       case (field, index) => safeRequiredIndices(safeRequiredFields.indexOf(field)) = index
     }
     val requiredSize = requiredFields.length
-    val row = new GenericMutableRow(requiredSize)
+    val row = new GenericInternalRow(requiredSize)
 
     (tokens: Array[String], numMalformedRows) => {
       if (params.dropMalformed && schemaFields.length != tokens.length) {

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 66f2bad..4754963 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -30,7 +30,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{DataFrame, Row}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
-import org.apache.spark.sql.catalyst.expressions.{MutableRow, SpecificMutableRow}
+import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
 import org.apache.spark.sql.catalyst.util.{DateTimeUtils, GenericArrayData}
 import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects, JdbcType}
 import org.apache.spark.sql.types._
@@ -283,7 +283,7 @@ object JdbcUtils extends Logging {
     new NextIterator[InternalRow] {
       private[this] val rs = resultSet
       private[this] val getters: Array[JDBCValueGetter] = makeGetters(schema)
-      private[this] val mutableRow = new SpecificMutableRow(schema.fields.map(x => x.dataType))
+      private[this] val mutableRow = new SpecificInternalRow(schema.fields.map(x => x.dataType))
 
       override protected def close(): Unit = {
         try {
@@ -314,22 +314,22 @@ object JdbcUtils extends Logging {
   // A `JDBCValueGetter` is responsible for getting a value from `ResultSet` into a field
   // for `MutableRow`. The last argument `Int` means the index for the value to be set in
   // the row and also used for the value in `ResultSet`.
-  private type JDBCValueGetter = (ResultSet, MutableRow, Int) => Unit
+  private type JDBCValueGetter = (ResultSet, InternalRow, Int) => Unit
 
   /**
    * Creates `JDBCValueGetter`s according to [[StructType]], which can set
-   * each value from `ResultSet` to each field of [[MutableRow]] correctly.
+   * each value from `ResultSet` to each field of [[InternalRow]] correctly.
    */
   private def makeGetters(schema: StructType): Array[JDBCValueGetter] =
     schema.fields.map(sf => makeGetter(sf.dataType, sf.metadata))
 
   private def makeGetter(dt: DataType, metadata: Metadata): JDBCValueGetter = dt match {
     case BooleanType =>
-      (rs: ResultSet, row: MutableRow, pos: Int) =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
         row.setBoolean(pos, rs.getBoolean(pos + 1))
 
     case DateType =>
-      (rs: ResultSet, row: MutableRow, pos: Int) =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
         // DateTimeUtils.fromJavaDate does not handle null value, so we need to check it.
         val dateVal = rs.getDate(pos + 1)
         if (dateVal != null) {
@@ -347,25 +347,25 @@ object JdbcUtils extends Logging {
     // retrieve it, you will get wrong result 199.99.
     // So it is needed to set precision and scale for Decimal based on JDBC metadata.
     case DecimalType.Fixed(p, s) =>
-      (rs: ResultSet, row: MutableRow, pos: Int) =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
         val decimal =
           nullSafeConvert[java.math.BigDecimal](rs.getBigDecimal(pos + 1), d => Decimal(d, p, s))
         row.update(pos, decimal)
 
     case DoubleType =>
-      (rs: ResultSet, row: MutableRow, pos: Int) =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
         row.setDouble(pos, rs.getDouble(pos + 1))
 
     case FloatType =>
-      (rs: ResultSet, row: MutableRow, pos: Int) =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
         row.setFloat(pos, rs.getFloat(pos + 1))
 
     case IntegerType =>
-      (rs: ResultSet, row: MutableRow, pos: Int) =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
         row.setInt(pos, rs.getInt(pos + 1))
 
     case LongType if metadata.contains("binarylong") =>
-      (rs: ResultSet, row: MutableRow, pos: Int) =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
         val bytes = rs.getBytes(pos + 1)
         var ans = 0L
         var j = 0
@@ -376,20 +376,20 @@ object JdbcUtils extends Logging {
         row.setLong(pos, ans)
 
     case LongType =>
-      (rs: ResultSet, row: MutableRow, pos: Int) =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
         row.setLong(pos, rs.getLong(pos + 1))
 
     case ShortType =>
-      (rs: ResultSet, row: MutableRow, pos: Int) =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
         row.setShort(pos, rs.getShort(pos + 1))
 
     case StringType =>
-      (rs: ResultSet, row: MutableRow, pos: Int) =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
         // TODO(davies): use getBytes for better performance, if the encoding is UTF-8
         row.update(pos, UTF8String.fromString(rs.getString(pos + 1)))
 
     case TimestampType =>
-      (rs: ResultSet, row: MutableRow, pos: Int) =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
         val t = rs.getTimestamp(pos + 1)
         if (t != null) {
           row.setLong(pos, DateTimeUtils.fromJavaTimestamp(t))
@@ -398,7 +398,7 @@ object JdbcUtils extends Logging {
         }
 
     case BinaryType =>
-      (rs: ResultSet, row: MutableRow, pos: Int) =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
         row.update(pos, rs.getBytes(pos + 1))
 
     case ArrayType(et, _) =>
@@ -437,7 +437,7 @@ object JdbcUtils extends Logging {
         case _ => (array: Object) => array.asInstanceOf[Array[Any]]
       }
 
-      (rs: ResultSet, row: MutableRow, pos: Int) =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
         val array = nullSafeConvert[Object](
           rs.getArray(pos + 1).getArray,
           array => new GenericArrayData(elementConversion.apply(array)))

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index 9ffc2b5..33dcf2f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -40,7 +40,7 @@ import org.apache.spark.unsafe.types.UTF8String
 /**
  * A [[ParentContainerUpdater]] is used by a Parquet converter to set converted values to some
  * corresponding parent container. For example, a converter for a `StructType` field may set
- * converted values to a [[MutableRow]]; or a converter for array elements may append converted
+ * converted values to a [[InternalRow]]; or a converter for array elements may append converted
  * values to an [[ArrayBuffer]].
  */
 private[parquet] trait ParentContainerUpdater {
@@ -155,7 +155,7 @@ private[parquet] class ParquetRowConverter(
    * Updater used together with field converters within a [[ParquetRowConverter]].  It propagates
    * converted filed values to the `ordinal`-th cell in `currentRow`.
    */
-  private final class RowUpdater(row: MutableRow, ordinal: Int) extends ParentContainerUpdater {
+  private final class RowUpdater(row: InternalRow, ordinal: Int) extends ParentContainerUpdater {
     override def set(value: Any): Unit = row(ordinal) = value
     override def setBoolean(value: Boolean): Unit = row.setBoolean(ordinal, value)
     override def setByte(value: Byte): Unit = row.setByte(ordinal, value)
@@ -166,7 +166,7 @@ private[parquet] class ParquetRowConverter(
     override def setFloat(value: Float): Unit = row.setFloat(ordinal, value)
   }
 
-  private val currentRow = new SpecificMutableRow(catalystType.map(_.dataType))
+  private val currentRow = new SpecificInternalRow(catalystType.map(_.dataType))
 
   private val unsafeProjection = UnsafeProjection.create(catalystType)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
index 43cdce7..bfe7e3d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
@@ -119,7 +119,7 @@ case class BroadcastNestedLoopJoinExec(
     streamed.execute().mapPartitionsInternal { streamedIter =>
       val buildRows = relation.value
       val joinedRow = new JoinedRow
-      val nulls = new GenericMutableRow(broadcast.output.size)
+      val nulls = new GenericInternalRow(broadcast.output.size)
 
       // Returns an iterator to avoid copy the rows.
       new Iterator[InternalRow] {
@@ -205,14 +205,14 @@ case class BroadcastNestedLoopJoinExec(
       val joinedRow = new JoinedRow
 
       if (condition.isDefined) {
-        val resultRow = new GenericMutableRow(Array[Any](null))
+        val resultRow = new GenericInternalRow(Array[Any](null))
         streamedIter.map { row =>
           val result = buildRows.exists(r => boundCondition(joinedRow(row, r)))
           resultRow.setBoolean(0, result)
           joinedRow(row, resultRow)
         }
       } else {
-        val resultRow = new GenericMutableRow(Array[Any](buildRows.nonEmpty))
+        val resultRow = new GenericInternalRow(Array[Any](buildRows.nonEmpty))
         streamedIter.map { row =>
           joinedRow(row, resultRow)
         }
@@ -293,7 +293,7 @@ case class BroadcastNestedLoopJoinExec(
     }
 
     val notMatchedBroadcastRows: Seq[InternalRow] = {
-      val nulls = new GenericMutableRow(streamed.output.size)
+      val nulls = new GenericInternalRow(streamed.output.size)
       val buf: CompactBuffer[InternalRow] = new CompactBuffer()
       val joinedRow = new JoinedRow
       joinedRow.withLeft(nulls)
@@ -311,7 +311,7 @@ case class BroadcastNestedLoopJoinExec(
     val matchedStreamRows = streamRdd.mapPartitionsInternal { streamedIter =>
       val buildRows = relation.value
       val joinedRow = new JoinedRow
-      val nulls = new GenericMutableRow(broadcast.output.size)
+      val nulls = new GenericInternalRow(broadcast.output.size)
 
       streamedIter.flatMap { streamedRow =>
         var i = 0

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index fb6bfa7..8ddac19 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -192,7 +192,7 @@ trait HashJoin {
       streamIter: Iterator[InternalRow],
       hashedRelation: HashedRelation): Iterator[InternalRow] = {
     val joinKeys = streamSideKeyGenerator()
-    val result = new GenericMutableRow(Array[Any](null))
+    val result = new GenericInternalRow(Array[Any](null))
     val joinedRow = new JoinedRow
     streamIter.map { current =>
       val key = joinKeys(current)

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
index 81b3e1d..ecf7cf2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
@@ -275,7 +275,7 @@ case class SortMergeJoinExec(
         case j: ExistenceJoin =>
           new RowIterator {
             private[this] var currentLeftRow: InternalRow = _
-            private[this] val result: MutableRow = new GenericMutableRow(Array[Any](null))
+            private[this] val result: InternalRow = new GenericInternalRow(Array[Any](null))
             private[this] val smjScanner = new SortMergeJoinScanner(
               createLeftKeyGenerator(),
               createRightKeyGenerator(),

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
index c7e2671..2acc511 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
@@ -141,7 +141,7 @@ object ObjectOperator {
   def serializeObjectToRow(serializer: Seq[Expression]): Any => UnsafeRow = {
     val proj = GenerateUnsafeProjection.generate(serializer)
     val objType = serializer.head.collect { case b: BoundReference => b.dataType }.head
-    val objRow = new SpecificMutableRow(objType :: Nil)
+    val objRow = new SpecificInternalRow(objType :: Nil)
     (o: Any) => {
       objRow(0) = o
       proj(objRow)
@@ -149,7 +149,7 @@ object ObjectOperator {
   }
 
   def wrapObjectToRow(objType: DataType): Any => InternalRow = {
-    val outputRow = new SpecificMutableRow(objType :: Nil)
+    val outputRow = new SpecificInternalRow(objType :: Nil)
     (o: Any) => {
       outputRow(0) = o
       outputRow

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
index f9d20ad..dcaf2c7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
@@ -147,7 +147,7 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi
         .compute(inputIterator, context.partitionId(), context)
 
       val unpickle = new Unpickler
-      val mutableRow = new GenericMutableRow(1)
+      val mutableRow = new GenericInternalRow(1)
       val joined = new JoinedRow
       val resultType = if (udfs.length == 1) {
         udfs.head.dataType

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
index 822f49e..c02b154 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.stat
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{Column, DataFrame, Dataset, Row}
-import org.apache.spark.sql.catalyst.expressions.{Cast, GenericMutableRow}
+import org.apache.spark.sql.catalyst.expressions.{Cast, GenericInternalRow}
 import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
 import org.apache.spark.sql.catalyst.util.QuantileSummaries
 import org.apache.spark.sql.functions._
@@ -186,7 +186,7 @@ object StatFunctions extends Logging {
     require(columnSize < 1e4, s"The number of distinct values for $col2, can't " +
       s"exceed 1e4. Currently $columnSize")
     val table = counts.groupBy(_.get(0)).map { case (col1Item, rows) =>
-      val countsRow = new GenericMutableRow(columnSize + 1)
+      val countsRow = new GenericInternalRow(columnSize + 1)
       rows.foreach { (row: Row) =>
         // row.get(0) is column 1
         // row.get(1) is column 2

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala
index d3a46d0..c9f5d3b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala
@@ -123,7 +123,7 @@ private[window] final class AggregateProcessor(
 
   private[this] val join = new JoinedRow
   private[this] val numImperatives = imperatives.length
-  private[this] val buffer = new SpecificMutableRow(bufferSchema.toSeq.map(_.dataType))
+  private[this] val buffer = new SpecificInternalRow(bufferSchema.toSeq.map(_.dataType))
   initialProjection.target(buffer)
   updateProjection.target(buffer)
 
@@ -154,6 +154,6 @@ private[window] final class AggregateProcessor(
   }
 
   /** Evaluate buffer. */
-  def evaluate(target: MutableRow): Unit =
+  def evaluate(target: InternalRow): Unit =
   evaluateProjection.target(target)(buffer)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
index 7a6a30f..1dd281e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
@@ -204,7 +204,7 @@ case class WindowExec(
         val factory = key match {
           // Offset Frame
           case ("OFFSET", RowFrame, Some(offset), Some(h)) if offset == h =>
-            target: MutableRow =>
+            target: InternalRow =>
               new OffsetWindowFunctionFrame(
                 target,
                 ordinal,
@@ -217,7 +217,7 @@ case class WindowExec(
 
           // Growing Frame.
           case ("AGGREGATE", frameType, None, Some(high)) =>
-            target: MutableRow => {
+            target: InternalRow => {
               new UnboundedPrecedingWindowFunctionFrame(
                 target,
                 processor,
@@ -226,7 +226,7 @@ case class WindowExec(
 
           // Shrinking Frame.
           case ("AGGREGATE", frameType, Some(low), None) =>
-            target: MutableRow => {
+            target: InternalRow => {
               new UnboundedFollowingWindowFunctionFrame(
                 target,
                 processor,
@@ -235,7 +235,7 @@ case class WindowExec(
 
           // Moving Frame.
           case ("AGGREGATE", frameType, Some(low), Some(high)) =>
-            target: MutableRow => {
+            target: InternalRow => {
               new SlidingWindowFunctionFrame(
                 target,
                 processor,
@@ -245,7 +245,7 @@ case class WindowExec(
 
           // Entire Partition Frame.
           case ("AGGREGATE", frameType, None, None) =>
-            target: MutableRow => {
+            target: InternalRow => {
               new UnboundedWindowFunctionFrame(target, processor)
             }
         }
@@ -312,7 +312,7 @@ case class WindowExec(
         val inputFields = child.output.length
         var sorter: UnsafeExternalSorter = null
         var rowBuffer: RowBuffer = null
-        val windowFunctionResult = new SpecificMutableRow(expressions.map(_.dataType))
+        val windowFunctionResult = new SpecificInternalRow(expressions.map(_.dataType))
         val frames = factories.map(_(windowFunctionResult))
         val numFrames = frames.length
         private[this] def fetchNextPartition() {

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
index 2ab9faa..70efc0f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
@@ -56,7 +56,7 @@ private[window] abstract class WindowFunctionFrame {
  * @param offset by which rows get moved within a partition.
  */
 private[window] final class OffsetWindowFunctionFrame(
-    target: MutableRow,
+    target: InternalRow,
     ordinal: Int,
     expressions: Array[OffsetWindowFunction],
     inputSchema: Seq[Attribute],
@@ -136,7 +136,7 @@ private[window] final class OffsetWindowFunctionFrame(
  * @param ubound comparator used to identify the upper bound of an output row.
  */
 private[window] final class SlidingWindowFunctionFrame(
-    target: MutableRow,
+    target: InternalRow,
     processor: AggregateProcessor,
     lbound: BoundOrdering,
     ubound: BoundOrdering)
@@ -217,7 +217,7 @@ private[window] final class SlidingWindowFunctionFrame(
  * @param processor to calculate the row values with.
  */
 private[window] final class UnboundedWindowFunctionFrame(
-    target: MutableRow,
+    target: InternalRow,
     processor: AggregateProcessor)
   extends WindowFunctionFrame {
 
@@ -255,7 +255,7 @@ private[window] final class UnboundedWindowFunctionFrame(
  * @param ubound comparator used to identify the upper bound of an output row.
  */
 private[window] final class UnboundedPrecedingWindowFunctionFrame(
-    target: MutableRow,
+    target: InternalRow,
     processor: AggregateProcessor,
     ubound: BoundOrdering)
   extends WindowFunctionFrame {
@@ -317,7 +317,7 @@ private[window] final class UnboundedPrecedingWindowFunctionFrame(
  * @param lbound comparator used to identify the lower bound of an output row.
  */
 private[window] final class UnboundedFollowingWindowFunctionFrame(
-    target: MutableRow,
+    target: InternalRow,
     processor: AggregateProcessor,
     lbound: BoundOrdering)
   extends WindowFunctionFrame {

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala
index 34936b3..7516be3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, SpecificMutableRow}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, SpecificInternalRow}
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
@@ -27,7 +27,7 @@ class RowSuite extends SparkFunSuite with SharedSQLContext {
   import testImplicits._
 
   test("create row") {
-    val expected = new GenericMutableRow(4)
+    val expected = new GenericInternalRow(4)
     expected.setInt(0, 2147483647)
     expected.update(1, UTF8String.fromString("this is a string"))
     expected.setBoolean(2, false)
@@ -49,7 +49,7 @@ class RowSuite extends SparkFunSuite with SharedSQLContext {
   }
 
   test("SpecificMutableRow.update with null") {
-    val row = new SpecificMutableRow(Seq(IntegerType))
+    val row = new SpecificInternalRow(Seq(IntegerType))
     row(0) = null
     assert(row.isNullAt(0))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/TypedImperativeAggregateSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TypedImperativeAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/TypedImperativeAggregateSuite.scala
index b5eb16b..ffa26f1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/TypedImperativeAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/TypedImperativeAggregateSuite.scala
@@ -21,7 +21,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, Da
 
 import org.apache.spark.sql.TypedImperativeAggregateSuite.TypedMax
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{BoundReference, Expression, GenericMutableRow, SpecificMutableRow}
+import org.apache.spark.sql.catalyst.expressions.{BoundReference, Expression, GenericInternalRow, SpecificInternalRow}
 import org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate
 import org.apache.spark.sql.execution.aggregate.SortAggregateExec
 import org.apache.spark.sql.expressions.Window
@@ -64,7 +64,7 @@ class TypedImperativeAggregateSuite extends QueryTest with SharedSQLContext {
     assert(agg.eval(mergeBuffer) == data.map(_._1).max)
 
     // Tests low level eval(row: InternalRow) API.
-    val row = new GenericMutableRow(Array(mergeBuffer): Array[Any])
+    val row = new GenericInternalRow(Array(mergeBuffer): Array[Any])
 
     // Evaluates directly on row consist of aggregation buffer object.
     assert(agg.eval(row) == data.map(_._1).max)
@@ -73,7 +73,7 @@ class TypedImperativeAggregateSuite extends QueryTest with SharedSQLContext {
   test("supports SpecificMutableRow as mutable row") {
     val aggregationBufferSchema = Seq(IntegerType, LongType, BinaryType, IntegerType)
     val aggBufferOffset = 2
-    val buffer = new SpecificMutableRow(aggregationBufferSchema)
+    val buffer = new SpecificInternalRow(aggregationBufferSchema)
     val agg = new TypedMax(BoundReference(ordinal = 1, dataType = IntegerType, nullable = false))
       .withNewMutableAggBufferOffset(aggBufferOffset)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala
index 805b566..8bf9f52 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.SparkFunSuite
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.CatalystTypeConverters
-import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection}
 import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._
 import org.apache.spark.sql.types._
 
@@ -54,7 +54,7 @@ class ColumnTypeSuite extends SparkFunSuite with Logging {
         expected: Int): Unit = {
 
       assertResult(expected, s"Wrong actualSize for $columnType") {
-        val row = new GenericMutableRow(1)
+        val row = new GenericInternalRow(1)
         row.update(0, CatalystTypeConverters.convertToCatalyst(value))
         val proj = UnsafeProjection.create(Array[DataType](columnType.dataType))
         columnType.actualSize(proj(row), 0)

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala
index 1529313..686c8fa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala
@@ -21,14 +21,14 @@ import scala.collection.immutable.HashSet
 import scala.util.Random
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, GenericMutableRow}
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData}
 import org.apache.spark.sql.types.{AtomicType, Decimal}
 import org.apache.spark.unsafe.types.UTF8String
 
 object ColumnarTestUtils {
-  def makeNullRow(length: Int): GenericMutableRow = {
-    val row = new GenericMutableRow(length)
+  def makeNullRow(length: Int): GenericInternalRow = {
+    val row = new GenericInternalRow(length)
     (0 until length).foreach(row.setNullAt)
     row
   }
@@ -86,7 +86,7 @@ object ColumnarTestUtils {
       tail: ColumnType[_]*): InternalRow = makeRandomRow(Seq(head) ++ tail)
 
   def makeRandomRow(columnTypes: Seq[ColumnType[_]]): InternalRow = {
-    val row = new GenericMutableRow(columnTypes.length)
+    val row = new GenericInternalRow(columnTypes.length)
     makeRandomValues(columnTypes).zipWithIndex.foreach { case (value, index) =>
       row(index) = value
     }
@@ -95,11 +95,11 @@ object ColumnarTestUtils {
 
   def makeUniqueValuesAndSingleValueRows[T <: AtomicType](
       columnType: NativeColumnType[T],
-      count: Int): (Seq[T#InternalType], Seq[GenericMutableRow]) = {
+      count: Int): (Seq[T#InternalType], Seq[GenericInternalRow]) = {
 
     val values = makeUniqueRandomValues(columnType, count)
     val rows = values.map { value =>
-      val row = new GenericMutableRow(1)
+      val row = new GenericInternalRow(1)
       row(0) = value
       row
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessorSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessorSuite.scala
index dc22d3e..8f4ca3c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessorSuite.scala
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.CatalystTypeConverters
-import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection}
 import org.apache.spark.sql.types._
 
 class TestNullableColumnAccessor[JvmType](
@@ -72,7 +72,7 @@ class NullableColumnAccessorSuite extends SparkFunSuite {
       }
 
       val accessor = TestNullableColumnAccessor(builder.build(), columnType)
-      val row = new GenericMutableRow(1)
+      val row = new GenericInternalRow(1)
       val converter = CatalystTypeConverters.createToScalaConverter(columnType.dataType)
 
       (0 until 4).foreach { _ =>

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilderSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilderSuite.scala
index cdd4551..b2b6e92 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilderSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilderSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.columnar
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.CatalystTypeConverters
-import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection}
 import org.apache.spark.sql.types._
 
 class TestNullableColumnBuilder[JvmType](columnType: ColumnType[JvmType])
@@ -94,7 +94,7 @@ class NullableColumnBuilderSuite extends SparkFunSuite {
       (1 to 7 by 2).foreach(assertResult(_, "Wrong null position")(buffer.getInt()))
 
       // For non-null values
-      val actual = new GenericMutableRow(new Array[Any](1))
+      val actual = new GenericInternalRow(new Array[Any](1))
       (0 until 4).foreach { _ =>
         columnType.extract(buffer, actual, 0)
         assert(converter(actual.get(0, dataType)) === converter(randomRow.get(0, dataType)),

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala
index f67e9c7..d01bf91 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.columnar.compression
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 import org.apache.spark.sql.execution.columnar.{BOOLEAN, NoopColumnStats}
 import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._
 
@@ -72,7 +72,7 @@ class BooleanBitSetSuite extends SparkFunSuite {
     buffer.rewind().position(headerSize + 4)
 
     val decoder = BooleanBitSet.decoder(buffer, BOOLEAN)
-    val mutableRow = new GenericMutableRow(1)
+    val mutableRow = new GenericInternalRow(1)
     if (values.nonEmpty) {
       values.foreach {
         assert(decoder.hasNext)

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala
index babf944..9005ec9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala
@@ -23,7 +23,7 @@ import java.nio.charset.StandardCharsets
 import org.apache.commons.lang3.RandomStringUtils
 import org.apache.commons.math3.distribution.LogNormalDistribution
 
-import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, GenericMutableRow}
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 import org.apache.spark.sql.execution.columnar.{BOOLEAN, INT, LONG, NativeColumnType, SHORT, STRING}
 import org.apache.spark.sql.types.AtomicType
 import org.apache.spark.util.Benchmark
@@ -111,7 +111,7 @@ object CompressionSchemeBenchmark extends AllCompressionSchemes {
       input.rewind()
 
       benchmark.addCase(label)({ i: Int =>
-        val rowBuf = new GenericMutableRow(1)
+        val rowBuf = new GenericInternalRow(1)
 
         for (n <- 0L until iters) {
           compressedBuf.rewind.position(4)

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala
index 830ca02..67139b1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.columnar.compression
 import java.nio.ByteBuffer
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 import org.apache.spark.sql.execution.columnar._
 import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._
 import org.apache.spark.sql.types.AtomicType
@@ -97,7 +97,7 @@ class DictionaryEncodingSuite extends SparkFunSuite {
         buffer.rewind().position(headerSize + 4)
 
         val decoder = DictionaryEncoding.decoder(buffer, columnType)
-        val mutableRow = new GenericMutableRow(1)
+        val mutableRow = new GenericInternalRow(1)
 
         if (inputSeq.nonEmpty) {
           inputSeq.foreach { i =>

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala
index a530e27..411d31f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.execution.columnar.compression
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 import org.apache.spark.sql.execution.columnar._
 import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._
 import org.apache.spark.sql.types.IntegralType
@@ -48,7 +48,7 @@ class IntegralDeltaSuite extends SparkFunSuite {
       }
 
       input.foreach { value =>
-        val row = new GenericMutableRow(1)
+        val row = new GenericInternalRow(1)
         columnType.setField(row, 0, value)
         builder.appendFrom(row, 0)
       }
@@ -95,7 +95,7 @@ class IntegralDeltaSuite extends SparkFunSuite {
       buffer.rewind().position(headerSize + 4)
 
       val decoder = scheme.decoder(buffer, columnType)
-      val mutableRow = new GenericMutableRow(1)
+      val mutableRow = new GenericInternalRow(1)
 
       if (input.nonEmpty) {
         input.foreach{

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala
index 95642e9..dffa9b3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.execution.columnar.compression
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 import org.apache.spark.sql.execution.columnar._
 import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._
 import org.apache.spark.sql.types.AtomicType
@@ -80,7 +80,7 @@ class RunLengthEncodingSuite extends SparkFunSuite {
       buffer.rewind().position(headerSize + 4)
 
       val decoder = RunLengthEncoding.decoder(buffer, columnType)
-      val mutableRow = new GenericMutableRow(1)
+      val mutableRow = new GenericInternalRow(1)
 
       if (inputSeq.nonEmpty) {
         inputSeq.foreach { i =>

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 3161a63..580eade 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -38,7 +38,7 @@ import org.apache.parquet.schema.{MessageType, MessageTypeParser}
 import org.apache.spark.SparkException
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection}
-import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
@@ -716,7 +716,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
       dataTypes.zip(constantValues).foreach { case (dt, v) =>
         val schema = StructType(StructField("pcol", dt) :: Nil)
         val vectorizedReader = new VectorizedParquetRecordReader
-        val partitionValues = new GenericMutableRow(Array(v))
+        val partitionValues = new GenericInternalRow(Array(v))
         val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0)
 
         try {

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 9dd8d9f..4c4a7d8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -24,7 +24,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
-import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow
+import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
 import org.apache.spark.sql.execution.FileSourceScanExec
 import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT, SingleElement}
 import org.apache.spark.sql.internal.SQLConf
@@ -719,7 +719,7 @@ object TestingUDT {
         .add("c", DoubleType, nullable = false)
 
     override def serialize(n: NestedStruct): Any = {
-      val row = new SpecificMutableRow(sqlType.asInstanceOf[StructType].map(_.dataType))
+      val row = new SpecificInternalRow(sqlType.asInstanceOf[StructType].map(_.dataType))
       row.setInt(0, n.a)
       row.setLong(1, n.b)
       row.setDouble(2, n.c)

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
index fe34caa..1625116 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
@@ -688,25 +688,25 @@ private[hive] trait HiveInspectors {
    * @return A function that performs in-place updating of a MutableRow.
    *         Use the overloaded ObjectInspector version for assignments.
    */
-  def unwrapperFor(field: HiveStructField): (Any, MutableRow, Int) => Unit =
+  def unwrapperFor(field: HiveStructField): (Any, InternalRow, Int) => Unit =
     field.getFieldObjectInspector match {
       case oi: BooleanObjectInspector =>
-        (value: Any, row: MutableRow, ordinal: Int) => row.setBoolean(ordinal, oi.get(value))
+        (value: Any, row: InternalRow, ordinal: Int) => row.setBoolean(ordinal, oi.get(value))
       case oi: ByteObjectInspector =>
-        (value: Any, row: MutableRow, ordinal: Int) => row.setByte(ordinal, oi.get(value))
+        (value: Any, row: InternalRow, ordinal: Int) => row.setByte(ordinal, oi.get(value))
       case oi: ShortObjectInspector =>
-        (value: Any, row: MutableRow, ordinal: Int) => row.setShort(ordinal, oi.get(value))
+        (value: Any, row: InternalRow, ordinal: Int) => row.setShort(ordinal, oi.get(value))
       case oi: IntObjectInspector =>
-        (value: Any, row: MutableRow, ordinal: Int) => row.setInt(ordinal, oi.get(value))
+        (value: Any, row: InternalRow, ordinal: Int) => row.setInt(ordinal, oi.get(value))
       case oi: LongObjectInspector =>
-        (value: Any, row: MutableRow, ordinal: Int) => row.setLong(ordinal, oi.get(value))
+        (value: Any, row: InternalRow, ordinal: Int) => row.setLong(ordinal, oi.get(value))
       case oi: FloatObjectInspector =>
-        (value: Any, row: MutableRow, ordinal: Int) => row.setFloat(ordinal, oi.get(value))
+        (value: Any, row: InternalRow, ordinal: Int) => row.setFloat(ordinal, oi.get(value))
       case oi: DoubleObjectInspector =>
-        (value: Any, row: MutableRow, ordinal: Int) => row.setDouble(ordinal, oi.get(value))
+        (value: Any, row: InternalRow, ordinal: Int) => row.setDouble(ordinal, oi.get(value))
       case oi =>
         val unwrapper = unwrapperFor(oi)
-        (value: Any, row: MutableRow, ordinal: Int) => row(ordinal) = unwrapper(value)
+        (value: Any, row: InternalRow, ordinal: Int) => row(ordinal) = unwrapper(value)
     }
 
   def wrap(a: Any, oi: ObjectInspector, dataType: DataType): AnyRef = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org