You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2016/10/07 21:03:51 UTC

[1/3] spark git commit: [SPARK-17761][SQL] Remove MutableRow

Repository: spark
Updated Branches:
  refs/heads/master 2badb58cd -> 97594c29b


http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index ec7e53e..2a54163 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -120,7 +120,7 @@ class HadoopTableReader(
     val hadoopRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)
 
     val attrsWithIndex = attributes.zipWithIndex
-    val mutableRow = new SpecificMutableRow(attributes.map(_.dataType))
+    val mutableRow = new SpecificInternalRow(attributes.map(_.dataType))
 
     val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter =>
       val hconf = broadcastedHadoopConf.value.value
@@ -215,7 +215,7 @@ class HadoopTableReader(
       val tableDesc = relation.tableDesc
       val broadcastedHiveConf = _broadcastedHadoopConf
       val localDeserializer = partDeserializer
-      val mutableRow = new SpecificMutableRow(attributes.map(_.dataType))
+      val mutableRow = new SpecificInternalRow(attributes.map(_.dataType))
 
       // Splits all attributes into two groups, partition key attributes and those that are not.
       // Attached indices indicate the position of each attribute in the output schema.
@@ -224,7 +224,7 @@ class HadoopTableReader(
           relation.partitionKeys.contains(attr)
         }
 
-      def fillPartitionKeys(rawPartValues: Array[String], row: MutableRow): Unit = {
+      def fillPartitionKeys(rawPartValues: Array[String], row: InternalRow): Unit = {
         partitionKeyAttrs.foreach { case (attr, ordinal) =>
           val partOrdinal = relation.partitionKeys.indexOf(attr)
           row(ordinal) = Cast(Literal(rawPartValues(partOrdinal)), attr.dataType).eval(null)
@@ -360,7 +360,7 @@ private[hive] object HadoopTableReader extends HiveInspectors with Logging {
       iterator: Iterator[Writable],
       rawDeser: Deserializer,
       nonPartitionKeyAttrs: Seq[(Attribute, Int)],
-      mutableRow: MutableRow,
+      mutableRow: InternalRow,
       tableDeser: Deserializer): Iterator[InternalRow] = {
 
     val soi = if (rawDeser.getObjectInspector.equals(tableDeser.getObjectInspector)) {
@@ -381,43 +381,43 @@ private[hive] object HadoopTableReader extends HiveInspectors with Logging {
      * Builds specific unwrappers ahead of time according to object inspector
      * types to avoid pattern matching and branching costs per row.
      */
-    val unwrappers: Seq[(Any, MutableRow, Int) => Unit] = fieldRefs.map {
+    val unwrappers: Seq[(Any, InternalRow, Int) => Unit] = fieldRefs.map {
       _.getFieldObjectInspector match {
         case oi: BooleanObjectInspector =>
-          (value: Any, row: MutableRow, ordinal: Int) => row.setBoolean(ordinal, oi.get(value))
+          (value: Any, row: InternalRow, ordinal: Int) => row.setBoolean(ordinal, oi.get(value))
         case oi: ByteObjectInspector =>
-          (value: Any, row: MutableRow, ordinal: Int) => row.setByte(ordinal, oi.get(value))
+          (value: Any, row: InternalRow, ordinal: Int) => row.setByte(ordinal, oi.get(value))
         case oi: ShortObjectInspector =>
-          (value: Any, row: MutableRow, ordinal: Int) => row.setShort(ordinal, oi.get(value))
+          (value: Any, row: InternalRow, ordinal: Int) => row.setShort(ordinal, oi.get(value))
         case oi: IntObjectInspector =>
-          (value: Any, row: MutableRow, ordinal: Int) => row.setInt(ordinal, oi.get(value))
+          (value: Any, row: InternalRow, ordinal: Int) => row.setInt(ordinal, oi.get(value))
         case oi: LongObjectInspector =>
-          (value: Any, row: MutableRow, ordinal: Int) => row.setLong(ordinal, oi.get(value))
+          (value: Any, row: InternalRow, ordinal: Int) => row.setLong(ordinal, oi.get(value))
         case oi: FloatObjectInspector =>
-          (value: Any, row: MutableRow, ordinal: Int) => row.setFloat(ordinal, oi.get(value))
+          (value: Any, row: InternalRow, ordinal: Int) => row.setFloat(ordinal, oi.get(value))
         case oi: DoubleObjectInspector =>
-          (value: Any, row: MutableRow, ordinal: Int) => row.setDouble(ordinal, oi.get(value))
+          (value: Any, row: InternalRow, ordinal: Int) => row.setDouble(ordinal, oi.get(value))
         case oi: HiveVarcharObjectInspector =>
-          (value: Any, row: MutableRow, ordinal: Int) =>
+          (value: Any, row: InternalRow, ordinal: Int) =>
             row.update(ordinal, UTF8String.fromString(oi.getPrimitiveJavaObject(value).getValue))
         case oi: HiveCharObjectInspector =>
-          (value: Any, row: MutableRow, ordinal: Int) =>
+          (value: Any, row: InternalRow, ordinal: Int) =>
             row.update(ordinal, UTF8String.fromString(oi.getPrimitiveJavaObject(value).getValue))
         case oi: HiveDecimalObjectInspector =>
-          (value: Any, row: MutableRow, ordinal: Int) =>
+          (value: Any, row: InternalRow, ordinal: Int) =>
             row.update(ordinal, HiveShim.toCatalystDecimal(oi, value))
         case oi: TimestampObjectInspector =>
-          (value: Any, row: MutableRow, ordinal: Int) =>
+          (value: Any, row: InternalRow, ordinal: Int) =>
             row.setLong(ordinal, DateTimeUtils.fromJavaTimestamp(oi.getPrimitiveJavaObject(value)))
         case oi: DateObjectInspector =>
-          (value: Any, row: MutableRow, ordinal: Int) =>
+          (value: Any, row: InternalRow, ordinal: Int) =>
             row.setInt(ordinal, DateTimeUtils.fromJavaDate(oi.getPrimitiveJavaObject(value)))
         case oi: BinaryObjectInspector =>
-          (value: Any, row: MutableRow, ordinal: Int) =>
+          (value: Any, row: InternalRow, ordinal: Int) =>
             row.update(ordinal, oi.getPrimitiveJavaObject(value))
         case oi =>
           val unwrapper = unwrapperFor(oi)
-          (value: Any, row: MutableRow, ordinal: Int) => row(ordinal) = unwrapper(value)
+          (value: Any, row: InternalRow, ordinal: Int) => row(ordinal) = unwrapper(value)
       }
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index c553c03..1025b8f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -124,7 +124,7 @@ case class ScriptTransformation(
         } else {
           null
         }
-        val mutableRow = new SpecificMutableRow(output.map(_.dataType))
+        val mutableRow = new SpecificInternalRow(output.map(_.dataType))
 
         @transient
         lazy val unwrappers = outputSoi.getAllStructFieldRefs.asScala.map(unwrapperFor)

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
index d549135..4203308 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
@@ -329,17 +329,17 @@ private[hive] case class HiveUDAFFunction(
   // buffer for it.
   override def aggBufferSchema: StructType = StructType(Nil)
 
-  override def update(_buffer: MutableRow, input: InternalRow): Unit = {
+  override def update(_buffer: InternalRow, input: InternalRow): Unit = {
     val inputs = inputProjection(input)
     function.iterate(buffer, wrap(inputs, wrappers, cached, inputDataTypes))
   }
 
-  override def merge(buffer1: MutableRow, buffer2: InternalRow): Unit = {
+  override def merge(buffer1: InternalRow, buffer2: InternalRow): Unit = {
     throw new UnsupportedOperationException(
       "Hive UDAF doesn't support partial aggregate")
   }
 
-  override def initialize(_buffer: MutableRow): Unit = {
+  override def initialize(_buffer: InternalRow): Unit = {
     buffer = function.getNewAggregationBuffer
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
index 15b72d8..e94f49e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
@@ -281,7 +281,7 @@ private[orc] object OrcRelation extends HiveInspectors {
       maybeStructOI: Option[StructObjectInspector],
       iterator: Iterator[Writable]): Iterator[InternalRow] = {
     val deserializer = new OrcSerde
-    val mutableRow = new SpecificMutableRow(dataSchema.map(_.dataType))
+    val mutableRow = new SpecificInternalRow(dataSchema.map(_.dataType))
     val unsafeProjection = UnsafeProjection.create(dataSchema)
 
     def unwrap(oi: StructObjectInspector): Iterator[InternalRow] = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[2/3] spark git commit: [SPARK-17761][SQL] Remove MutableRow

Posted by hv...@apache.org.
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
index 4e072a9..2988161 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
@@ -118,7 +118,7 @@ class TungstenAggregationIterator(
   private def createNewAggregationBuffer(): UnsafeRow = {
     val bufferSchema = aggregateFunctions.flatMap(_.aggBufferAttributes)
     val buffer: UnsafeRow = UnsafeProjection.create(bufferSchema.map(_.dataType))
-      .apply(new GenericMutableRow(bufferSchema.length))
+      .apply(new GenericInternalRow(bufferSchema.length))
     // Initialize declarative aggregates' buffer values
     expressionAggInitialProjection.target(buffer)(EmptyRow)
     // Initialize imperative aggregates' buffer values
@@ -127,7 +127,7 @@ class TungstenAggregationIterator(
   }
 
   // Creates a function used to generate output rows.
-  override protected def generateResultProjection(): (UnsafeRow, MutableRow) => UnsafeRow = {
+  override protected def generateResultProjection(): (UnsafeRow, InternalRow) => UnsafeRow = {
     val modes = aggregateExpressions.map(_.mode).distinct
     if (modes.nonEmpty && !modes.contains(Final) && !modes.contains(Complete)) {
       // Fast path for partial aggregation, UnsafeRowJoiner is usually faster than projection
@@ -137,7 +137,7 @@ class TungstenAggregationIterator(
       val bufferSchema = StructType.fromAttributes(bufferAttributes)
       val unsafeRowJoiner = GenerateUnsafeRowJoiner.create(groupingKeySchema, bufferSchema)
 
-      (currentGroupingKey: UnsafeRow, currentBuffer: MutableRow) => {
+      (currentGroupingKey: UnsafeRow, currentBuffer: InternalRow) => {
         unsafeRowJoiner.join(currentGroupingKey, currentBuffer.asInstanceOf[UnsafeRow])
       }
     } else {
@@ -300,7 +300,7 @@ class TungstenAggregationIterator(
   private[this] val sortBasedAggregationBuffer: UnsafeRow = createNewAggregationBuffer()
 
   // The function used to process rows in a group
-  private[this] var sortBasedProcessRow: (MutableRow, InternalRow) => Unit = null
+  private[this] var sortBasedProcessRow: (InternalRow, InternalRow) => Unit = null
 
   // Processes rows in the current group. It will stop when it find a new group.
   private def processCurrentSortedGroup(): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
index 586e145..67760f3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.aggregate
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, MutableRow, _}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, _}
 import org.apache.spark.sql.catalyst.expressions.aggregate.ImperativeAggregate
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
 import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
@@ -96,18 +96,18 @@ sealed trait BufferSetterGetterUtils {
     getters
   }
 
-  def createSetters(schema: StructType): Array[((MutableRow, Int, Any) => Unit)] = {
+  def createSetters(schema: StructType): Array[((InternalRow, Int, Any) => Unit)] = {
     val dataTypes = schema.fields.map(_.dataType)
-    val setters = new Array[(MutableRow, Int, Any) => Unit](dataTypes.length)
+    val setters = new Array[(InternalRow, Int, Any) => Unit](dataTypes.length)
 
     var i = 0
     while (i < setters.length) {
       setters(i) = dataTypes(i) match {
         case NullType =>
-          (row: MutableRow, ordinal: Int, value: Any) => row.setNullAt(ordinal)
+          (row: InternalRow, ordinal: Int, value: Any) => row.setNullAt(ordinal)
 
         case b: BooleanType =>
-          (row: MutableRow, ordinal: Int, value: Any) =>
+          (row: InternalRow, ordinal: Int, value: Any) =>
             if (value != null) {
               row.setBoolean(ordinal, value.asInstanceOf[Boolean])
             } else {
@@ -115,7 +115,7 @@ sealed trait BufferSetterGetterUtils {
             }
 
         case ByteType =>
-          (row: MutableRow, ordinal: Int, value: Any) =>
+          (row: InternalRow, ordinal: Int, value: Any) =>
             if (value != null) {
               row.setByte(ordinal, value.asInstanceOf[Byte])
             } else {
@@ -123,7 +123,7 @@ sealed trait BufferSetterGetterUtils {
             }
 
         case ShortType =>
-          (row: MutableRow, ordinal: Int, value: Any) =>
+          (row: InternalRow, ordinal: Int, value: Any) =>
             if (value != null) {
               row.setShort(ordinal, value.asInstanceOf[Short])
             } else {
@@ -131,7 +131,7 @@ sealed trait BufferSetterGetterUtils {
             }
 
         case IntegerType =>
-          (row: MutableRow, ordinal: Int, value: Any) =>
+          (row: InternalRow, ordinal: Int, value: Any) =>
             if (value != null) {
               row.setInt(ordinal, value.asInstanceOf[Int])
             } else {
@@ -139,7 +139,7 @@ sealed trait BufferSetterGetterUtils {
             }
 
         case LongType =>
-          (row: MutableRow, ordinal: Int, value: Any) =>
+          (row: InternalRow, ordinal: Int, value: Any) =>
             if (value != null) {
               row.setLong(ordinal, value.asInstanceOf[Long])
             } else {
@@ -147,7 +147,7 @@ sealed trait BufferSetterGetterUtils {
             }
 
         case FloatType =>
-          (row: MutableRow, ordinal: Int, value: Any) =>
+          (row: InternalRow, ordinal: Int, value: Any) =>
             if (value != null) {
               row.setFloat(ordinal, value.asInstanceOf[Float])
             } else {
@@ -155,7 +155,7 @@ sealed trait BufferSetterGetterUtils {
             }
 
         case DoubleType =>
-          (row: MutableRow, ordinal: Int, value: Any) =>
+          (row: InternalRow, ordinal: Int, value: Any) =>
             if (value != null) {
               row.setDouble(ordinal, value.asInstanceOf[Double])
             } else {
@@ -164,13 +164,13 @@ sealed trait BufferSetterGetterUtils {
 
         case dt: DecimalType =>
           val precision = dt.precision
-          (row: MutableRow, ordinal: Int, value: Any) =>
+          (row: InternalRow, ordinal: Int, value: Any) =>
             // To make it work with UnsafeRow, we cannot use setNullAt.
             // Please see the comment of UnsafeRow's setDecimal.
             row.setDecimal(ordinal, value.asInstanceOf[Decimal], precision)
 
         case DateType =>
-          (row: MutableRow, ordinal: Int, value: Any) =>
+          (row: InternalRow, ordinal: Int, value: Any) =>
             if (value != null) {
               row.setInt(ordinal, value.asInstanceOf[Int])
             } else {
@@ -178,7 +178,7 @@ sealed trait BufferSetterGetterUtils {
             }
 
         case TimestampType =>
-          (row: MutableRow, ordinal: Int, value: Any) =>
+          (row: InternalRow, ordinal: Int, value: Any) =>
             if (value != null) {
               row.setLong(ordinal, value.asInstanceOf[Long])
             } else {
@@ -186,7 +186,7 @@ sealed trait BufferSetterGetterUtils {
             }
 
         case other =>
-          (row: MutableRow, ordinal: Int, value: Any) =>
+          (row: InternalRow, ordinal: Int, value: Any) =>
             if (value != null) {
               row.update(ordinal, value)
             } else {
@@ -209,7 +209,7 @@ private[aggregate] class MutableAggregationBufferImpl(
     toCatalystConverters: Array[Any => Any],
     toScalaConverters: Array[Any => Any],
     bufferOffset: Int,
-    var underlyingBuffer: MutableRow)
+    var underlyingBuffer: InternalRow)
   extends MutableAggregationBuffer with BufferSetterGetterUtils {
 
   private[this] val offsets: Array[Int] = {
@@ -413,13 +413,13 @@ case class ScalaUDAF(
       null)
   }
 
-  override def initialize(buffer: MutableRow): Unit = {
+  override def initialize(buffer: InternalRow): Unit = {
     mutableAggregateBuffer.underlyingBuffer = buffer
 
     udaf.initialize(mutableAggregateBuffer)
   }
 
-  override def update(buffer: MutableRow, input: InternalRow): Unit = {
+  override def update(buffer: InternalRow, input: InternalRow): Unit = {
     mutableAggregateBuffer.underlyingBuffer = buffer
 
     udaf.update(
@@ -427,7 +427,7 @@ case class ScalaUDAF(
       inputToScalaConverters(inputProjection(input)).asInstanceOf[Row])
   }
 
-  override def merge(buffer1: MutableRow, buffer2: InternalRow): Unit = {
+  override def merge(buffer1: InternalRow, buffer2: InternalRow): Unit = {
     mutableAggregateBuffer.underlyingBuffer = buffer1
     inputAggregateBuffer.underlyingInputBuffer = buffer2
 

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala
index 7cde04b..6241b79 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala
@@ -21,15 +21,16 @@ import java.nio.{ByteBuffer, ByteOrder}
 
 import scala.annotation.tailrec
 
-import org.apache.spark.sql.catalyst.expressions.{MutableRow, UnsafeArrayData, UnsafeMapData, UnsafeRow}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{UnsafeArrayData, UnsafeMapData, UnsafeRow}
 import org.apache.spark.sql.execution.columnar.compression.CompressibleColumnAccessor
 import org.apache.spark.sql.types._
 
 /**
  * An `Iterator` like trait used to extract values from columnar byte buffer. When a value is
  * extracted from the buffer, instead of directly returning it, the value is set into some field of
- * a [[MutableRow]]. In this way, boxing cost can be avoided by leveraging the setter methods
- * for primitive values provided by [[MutableRow]].
+ * a [[InternalRow]]. In this way, boxing cost can be avoided by leveraging the setter methods
+ * for primitive values provided by [[InternalRow]].
  */
 private[columnar] trait ColumnAccessor {
   initialize()
@@ -38,7 +39,7 @@ private[columnar] trait ColumnAccessor {
 
   def hasNext: Boolean
 
-  def extractTo(row: MutableRow, ordinal: Int): Unit
+  def extractTo(row: InternalRow, ordinal: Int): Unit
 
   protected def underlyingBuffer: ByteBuffer
 }
@@ -52,11 +53,11 @@ private[columnar] abstract class BasicColumnAccessor[JvmType](
 
   override def hasNext: Boolean = buffer.hasRemaining
 
-  override def extractTo(row: MutableRow, ordinal: Int): Unit = {
+  override def extractTo(row: InternalRow, ordinal: Int): Unit = {
     extractSingle(row, ordinal)
   }
 
-  def extractSingle(row: MutableRow, ordinal: Int): Unit = {
+  def extractSingle(row: InternalRow, ordinal: Int): Unit = {
     columnType.extract(buffer, row, ordinal)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala
index d27d8c3..703bde2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala
@@ -92,7 +92,7 @@ private[columnar] sealed abstract class ColumnType[JvmType] {
    * `row(ordinal)`. Subclasses should override this method to avoid boxing/unboxing costs whenever
    * possible.
    */
-  def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
+  def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = {
     setField(row, ordinal, extract(buffer))
   }
 
@@ -125,13 +125,13 @@ private[columnar] sealed abstract class ColumnType[JvmType] {
    * Sets `row(ordinal)` to `field`. Subclasses should override this method to avoid boxing/unboxing
    * costs whenever possible.
    */
-  def setField(row: MutableRow, ordinal: Int, value: JvmType): Unit
+  def setField(row: InternalRow, ordinal: Int, value: JvmType): Unit
 
   /**
    * Copies `from(fromOrdinal)` to `to(toOrdinal)`. Subclasses should override this method to avoid
    * boxing/unboxing costs whenever possible.
    */
-  def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = {
+  def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int): Unit = {
     setField(to, toOrdinal, getField(from, fromOrdinal))
   }
 
@@ -149,7 +149,7 @@ private[columnar] object NULL extends ColumnType[Any] {
   override def defaultSize: Int = 0
   override def append(v: Any, buffer: ByteBuffer): Unit = {}
   override def extract(buffer: ByteBuffer): Any = null
-  override def setField(row: MutableRow, ordinal: Int, value: Any): Unit = row.setNullAt(ordinal)
+  override def setField(row: InternalRow, ordinal: Int, value: Any): Unit = row.setNullAt(ordinal)
   override def getField(row: InternalRow, ordinal: Int): Any = null
 }
 
@@ -177,18 +177,18 @@ private[columnar] object INT extends NativeColumnType(IntegerType, 4) {
     ByteBufferHelper.getInt(buffer)
   }
 
-  override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
+  override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = {
     row.setInt(ordinal, ByteBufferHelper.getInt(buffer))
   }
 
-  override def setField(row: MutableRow, ordinal: Int, value: Int): Unit = {
+  override def setField(row: InternalRow, ordinal: Int, value: Int): Unit = {
     row.setInt(ordinal, value)
   }
 
   override def getField(row: InternalRow, ordinal: Int): Int = row.getInt(ordinal)
 
 
-  override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
+  override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) {
     to.setInt(toOrdinal, from.getInt(fromOrdinal))
   }
 }
@@ -206,17 +206,17 @@ private[columnar] object LONG extends NativeColumnType(LongType, 8) {
     ByteBufferHelper.getLong(buffer)
   }
 
-  override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
+  override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = {
     row.setLong(ordinal, ByteBufferHelper.getLong(buffer))
   }
 
-  override def setField(row: MutableRow, ordinal: Int, value: Long): Unit = {
+  override def setField(row: InternalRow, ordinal: Int, value: Long): Unit = {
     row.setLong(ordinal, value)
   }
 
   override def getField(row: InternalRow, ordinal: Int): Long = row.getLong(ordinal)
 
-  override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
+  override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) {
     to.setLong(toOrdinal, from.getLong(fromOrdinal))
   }
 }
@@ -234,17 +234,17 @@ private[columnar] object FLOAT extends NativeColumnType(FloatType, 4) {
     ByteBufferHelper.getFloat(buffer)
   }
 
-  override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
+  override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = {
     row.setFloat(ordinal, ByteBufferHelper.getFloat(buffer))
   }
 
-  override def setField(row: MutableRow, ordinal: Int, value: Float): Unit = {
+  override def setField(row: InternalRow, ordinal: Int, value: Float): Unit = {
     row.setFloat(ordinal, value)
   }
 
   override def getField(row: InternalRow, ordinal: Int): Float = row.getFloat(ordinal)
 
-  override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
+  override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) {
     to.setFloat(toOrdinal, from.getFloat(fromOrdinal))
   }
 }
@@ -262,17 +262,17 @@ private[columnar] object DOUBLE extends NativeColumnType(DoubleType, 8) {
     ByteBufferHelper.getDouble(buffer)
   }
 
-  override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
+  override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = {
     row.setDouble(ordinal, ByteBufferHelper.getDouble(buffer))
   }
 
-  override def setField(row: MutableRow, ordinal: Int, value: Double): Unit = {
+  override def setField(row: InternalRow, ordinal: Int, value: Double): Unit = {
     row.setDouble(ordinal, value)
   }
 
   override def getField(row: InternalRow, ordinal: Int): Double = row.getDouble(ordinal)
 
-  override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
+  override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) {
     to.setDouble(toOrdinal, from.getDouble(fromOrdinal))
   }
 }
@@ -288,17 +288,17 @@ private[columnar] object BOOLEAN extends NativeColumnType(BooleanType, 1) {
 
   override def extract(buffer: ByteBuffer): Boolean = buffer.get() == 1
 
-  override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
+  override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = {
     row.setBoolean(ordinal, buffer.get() == 1)
   }
 
-  override def setField(row: MutableRow, ordinal: Int, value: Boolean): Unit = {
+  override def setField(row: InternalRow, ordinal: Int, value: Boolean): Unit = {
     row.setBoolean(ordinal, value)
   }
 
   override def getField(row: InternalRow, ordinal: Int): Boolean = row.getBoolean(ordinal)
 
-  override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
+  override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) {
     to.setBoolean(toOrdinal, from.getBoolean(fromOrdinal))
   }
 }
@@ -316,17 +316,17 @@ private[columnar] object BYTE extends NativeColumnType(ByteType, 1) {
     buffer.get()
   }
 
-  override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
+  override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = {
     row.setByte(ordinal, buffer.get())
   }
 
-  override def setField(row: MutableRow, ordinal: Int, value: Byte): Unit = {
+  override def setField(row: InternalRow, ordinal: Int, value: Byte): Unit = {
     row.setByte(ordinal, value)
   }
 
   override def getField(row: InternalRow, ordinal: Int): Byte = row.getByte(ordinal)
 
-  override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
+  override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) {
     to.setByte(toOrdinal, from.getByte(fromOrdinal))
   }
 }
@@ -344,17 +344,17 @@ private[columnar] object SHORT extends NativeColumnType(ShortType, 2) {
     buffer.getShort()
   }
 
-  override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
+  override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = {
     row.setShort(ordinal, buffer.getShort())
   }
 
-  override def setField(row: MutableRow, ordinal: Int, value: Short): Unit = {
+  override def setField(row: InternalRow, ordinal: Int, value: Short): Unit = {
     row.setShort(ordinal, value)
   }
 
   override def getField(row: InternalRow, ordinal: Int): Short = row.getShort(ordinal)
 
-  override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
+  override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) {
     to.setShort(toOrdinal, from.getShort(fromOrdinal))
   }
 }
@@ -366,7 +366,7 @@ private[columnar] object SHORT extends NativeColumnType(ShortType, 2) {
 private[columnar] trait DirectCopyColumnType[JvmType] extends ColumnType[JvmType] {
 
   // copy the bytes from ByteBuffer to UnsafeRow
-  override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
+  override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = {
     if (row.isInstanceOf[MutableUnsafeRow]) {
       val numBytes = buffer.getInt
       val cursor = buffer.position()
@@ -407,7 +407,7 @@ private[columnar] object STRING
     UTF8String.fromBytes(buffer.array(), buffer.arrayOffset() + cursor, length)
   }
 
-  override def setField(row: MutableRow, ordinal: Int, value: UTF8String): Unit = {
+  override def setField(row: InternalRow, ordinal: Int, value: UTF8String): Unit = {
     if (row.isInstanceOf[MutableUnsafeRow]) {
       row.asInstanceOf[MutableUnsafeRow].writer.write(ordinal, value)
     } else {
@@ -419,7 +419,7 @@ private[columnar] object STRING
     row.getUTF8String(ordinal)
   }
 
-  override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
+  override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) {
     setField(to, toOrdinal, getField(from, fromOrdinal))
   }
 
@@ -433,7 +433,7 @@ private[columnar] case class COMPACT_DECIMAL(precision: Int, scale: Int)
     Decimal(ByteBufferHelper.getLong(buffer), precision, scale)
   }
 
-  override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = {
+  override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = {
     if (row.isInstanceOf[MutableUnsafeRow]) {
       // copy it as Long
       row.setLong(ordinal, ByteBufferHelper.getLong(buffer))
@@ -459,11 +459,11 @@ private[columnar] case class COMPACT_DECIMAL(precision: Int, scale: Int)
     row.getDecimal(ordinal, precision, scale)
   }
 
-  override def setField(row: MutableRow, ordinal: Int, value: Decimal): Unit = {
+  override def setField(row: InternalRow, ordinal: Int, value: Decimal): Unit = {
     row.setDecimal(ordinal, value, precision)
   }
 
-  override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) {
+  override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) {
     setField(to, toOrdinal, getField(from, fromOrdinal))
   }
 }
@@ -497,7 +497,7 @@ private[columnar] object BINARY extends ByteArrayColumnType[Array[Byte]](16) {
 
   def dataType: DataType = BinaryType
 
-  override def setField(row: MutableRow, ordinal: Int, value: Array[Byte]): Unit = {
+  override def setField(row: InternalRow, ordinal: Int, value: Array[Byte]): Unit = {
     row.update(ordinal, value)
   }
 
@@ -522,7 +522,7 @@ private[columnar] case class LARGE_DECIMAL(precision: Int, scale: Int)
     row.getDecimal(ordinal, precision, scale)
   }
 
-  override def setField(row: MutableRow, ordinal: Int, value: Decimal): Unit = {
+  override def setField(row: InternalRow, ordinal: Int, value: Decimal): Unit = {
     row.setDecimal(ordinal, value, precision)
   }
 
@@ -553,7 +553,7 @@ private[columnar] case class STRUCT(dataType: StructType)
 
   override def defaultSize: Int = 20
 
-  override def setField(row: MutableRow, ordinal: Int, value: UnsafeRow): Unit = {
+  override def setField(row: InternalRow, ordinal: Int, value: UnsafeRow): Unit = {
     row.update(ordinal, value)
   }
 
@@ -591,7 +591,7 @@ private[columnar] case class ARRAY(dataType: ArrayType)
 
   override def defaultSize: Int = 28
 
-  override def setField(row: MutableRow, ordinal: Int, value: UnsafeArrayData): Unit = {
+  override def setField(row: InternalRow, ordinal: Int, value: UnsafeArrayData): Unit = {
     row.update(ordinal, value)
   }
 
@@ -630,7 +630,7 @@ private[columnar] case class MAP(dataType: MapType)
 
   override def defaultSize: Int = 68
 
-  override def setField(row: MutableRow, ordinal: Int, value: UnsafeMapData): Unit = {
+  override def setField(row: InternalRow, ordinal: Int, value: UnsafeMapData): Unit = {
     row.update(ordinal, value)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
index 96bd338..14024d6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
@@ -36,8 +36,7 @@ abstract class ColumnarIterator extends Iterator[InternalRow] {
  *
  * WARNING: These setter MUST be called in increasing order of ordinals.
  */
-class MutableUnsafeRow(val writer: UnsafeRowWriter) extends GenericMutableRow(null) {
-
+class MutableUnsafeRow(val writer: UnsafeRowWriter) extends BaseGenericInternalRow {
   override def isNullAt(i: Int): Boolean = writer.isNullAt(i)
   override def setNullAt(i: Int): Unit = writer.setNullAt(i)
 
@@ -55,6 +54,9 @@ class MutableUnsafeRow(val writer: UnsafeRowWriter) extends GenericMutableRow(nu
   override def update(i: Int, v: Any): Unit = throw new UnsupportedOperationException
 
   // all other methods inherited from GenericMutableRow are not need
+  override protected def genericGet(ordinal: Int): Any = throw new UnsupportedOperationException
+  override def numFields: Int = throw new UnsupportedOperationException
+  override def copy(): InternalRow = throw new UnsupportedOperationException
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala
index 2465633..2f09757 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.columnar
 
 import java.nio.{ByteBuffer, ByteOrder}
 
-import org.apache.spark.sql.catalyst.expressions.MutableRow
+import org.apache.spark.sql.catalyst.InternalRow
 
 private[columnar] trait NullableColumnAccessor extends ColumnAccessor {
   private var nullsBuffer: ByteBuffer = _
@@ -39,7 +39,7 @@ private[columnar] trait NullableColumnAccessor extends ColumnAccessor {
     super.initialize()
   }
 
-  abstract override def extractTo(row: MutableRow, ordinal: Int): Unit = {
+  abstract override def extractTo(row: InternalRow, ordinal: Int): Unit = {
     if (pos == nextNullIndex) {
       seenNulls += 1
 

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala
index 6579b50..e1d13ad 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.columnar.compression
 
-import org.apache.spark.sql.catalyst.expressions.MutableRow
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.columnar.{ColumnAccessor, NativeColumnAccessor}
 import org.apache.spark.sql.types.AtomicType
 
@@ -33,7 +33,7 @@ private[columnar] trait CompressibleColumnAccessor[T <: AtomicType] extends Colu
 
   abstract override def hasNext: Boolean = super.hasNext || decoder.hasNext
 
-  override def extractSingle(row: MutableRow, ordinal: Int): Unit = {
+  override def extractSingle(row: InternalRow, ordinal: Int): Unit = {
     decoder.next(row, ordinal)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala
index b90d00b..6e4f1c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.columnar.compression
 import java.nio.{ByteBuffer, ByteOrder}
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.MutableRow
 import org.apache.spark.sql.execution.columnar.{ColumnType, NativeColumnType}
 import org.apache.spark.sql.types.AtomicType
 
@@ -39,7 +38,7 @@ private[columnar] trait Encoder[T <: AtomicType] {
 }
 
 private[columnar] trait Decoder[T <: AtomicType] {
-  def next(row: MutableRow, ordinal: Int): Unit
+  def next(row: InternalRow, ordinal: Int): Unit
 
   def hasNext: Boolean
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
index 941f03b..ee99c90 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer
 import scala.collection.mutable
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{MutableRow, SpecificMutableRow}
+import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
 import org.apache.spark.sql.execution.columnar._
 import org.apache.spark.sql.types._
 
@@ -56,7 +56,7 @@ private[columnar] case object PassThrough extends CompressionScheme {
   class Decoder[T <: AtomicType](buffer: ByteBuffer, columnType: NativeColumnType[T])
     extends compression.Decoder[T] {
 
-    override def next(row: MutableRow, ordinal: Int): Unit = {
+    override def next(row: InternalRow, ordinal: Int): Unit = {
       columnType.extract(buffer, row, ordinal)
     }
 
@@ -86,7 +86,7 @@ private[columnar] case object RunLengthEncoding extends CompressionScheme {
     private var _compressedSize = 0
 
     // Using `MutableRow` to store the last value to avoid boxing/unboxing cost.
-    private val lastValue = new SpecificMutableRow(Seq(columnType.dataType))
+    private val lastValue = new SpecificInternalRow(Seq(columnType.dataType))
     private var lastRun = 0
 
     override def uncompressedSize: Int = _uncompressedSize
@@ -117,9 +117,9 @@ private[columnar] case object RunLengthEncoding extends CompressionScheme {
       to.putInt(RunLengthEncoding.typeId)
 
       if (from.hasRemaining) {
-        val currentValue = new SpecificMutableRow(Seq(columnType.dataType))
+        val currentValue = new SpecificInternalRow(Seq(columnType.dataType))
         var currentRun = 1
-        val value = new SpecificMutableRow(Seq(columnType.dataType))
+        val value = new SpecificInternalRow(Seq(columnType.dataType))
 
         columnType.extract(from, currentValue, 0)
 
@@ -156,7 +156,7 @@ private[columnar] case object RunLengthEncoding extends CompressionScheme {
     private var valueCount = 0
     private var currentValue: T#InternalType = _
 
-    override def next(row: MutableRow, ordinal: Int): Unit = {
+    override def next(row: InternalRow, ordinal: Int): Unit = {
       if (valueCount == run) {
         currentValue = columnType.extract(buffer)
         run = ByteBufferHelper.getInt(buffer)
@@ -273,7 +273,7 @@ private[columnar] case object DictionaryEncoding extends CompressionScheme {
       Array.fill[Any](elementNum)(columnType.extract(buffer).asInstanceOf[Any])
     }
 
-    override def next(row: MutableRow, ordinal: Int): Unit = {
+    override def next(row: InternalRow, ordinal: Int): Unit = {
       columnType.setField(row, ordinal, dictionary(buffer.getShort()).asInstanceOf[T#InternalType])
     }
 
@@ -356,7 +356,7 @@ private[columnar] case object BooleanBitSet extends CompressionScheme {
 
     private var visited: Int = 0
 
-    override def next(row: MutableRow, ordinal: Int): Unit = {
+    override def next(row: InternalRow, ordinal: Int): Unit = {
       val bit = visited % BITS_PER_LONG
 
       visited += 1
@@ -443,7 +443,7 @@ private[columnar] case object IntDelta extends CompressionScheme {
 
     override def hasNext: Boolean = buffer.hasRemaining
 
-    override def next(row: MutableRow, ordinal: Int): Unit = {
+    override def next(row: InternalRow, ordinal: Int): Unit = {
       val delta = buffer.get()
       prev = if (delta > Byte.MinValue) prev + delta else ByteBufferHelper.getInt(buffer)
       row.setInt(ordinal, prev)
@@ -523,7 +523,7 @@ private[columnar] case object LongDelta extends CompressionScheme {
 
     override def hasNext: Boolean = buffer.hasRemaining
 
-    override def next(row: MutableRow, ordinal: Int): Unit = {
+    override def next(row: InternalRow, ordinal: Int): Unit = {
       val delta = buffer.get()
       prev = if (delta > Byte.MinValue) prev + delta else ByteBufferHelper.getLong(buffer)
       row.setLong(ordinal, prev)

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 693b4c4..6f9ed50 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -273,7 +273,7 @@ object DataSourceStrategy extends Strategy with Logging {
   // Get the bucket ID based on the bucketing values.
   // Restriction: Bucket pruning works iff the bucketing column has one and only one column.
   def getBucketId(bucketColumn: Attribute, numBuckets: Int, value: Any): Int = {
-    val mutableRow = new SpecificMutableRow(Seq(bucketColumn.dataType))
+    val mutableRow = new SpecificInternalRow(Seq(bucketColumn.dataType))
     mutableRow(0) = Cast(Literal(value), bucketColumn.dataType).eval(null)
     val bucketIdGeneration = UnsafeProjection.create(
       HashPartitioning(bucketColumn :: Nil, numBuckets).partitionIdExpression :: Nil,

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
index 33b170b..55cb26d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
@@ -29,7 +29,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory, PartitionedFile, WriterContainer}
 import org.apache.spark.sql.types._
@@ -88,7 +88,7 @@ object CSVRelation extends Logging {
       case (field, index) => safeRequiredIndices(safeRequiredFields.indexOf(field)) = index
     }
     val requiredSize = requiredFields.length
-    val row = new GenericMutableRow(requiredSize)
+    val row = new GenericInternalRow(requiredSize)
 
     (tokens: Array[String], numMalformedRows) => {
       if (params.dropMalformed && schemaFields.length != tokens.length) {

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 66f2bad..4754963 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -30,7 +30,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{DataFrame, Row}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
-import org.apache.spark.sql.catalyst.expressions.{MutableRow, SpecificMutableRow}
+import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
 import org.apache.spark.sql.catalyst.util.{DateTimeUtils, GenericArrayData}
 import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects, JdbcType}
 import org.apache.spark.sql.types._
@@ -283,7 +283,7 @@ object JdbcUtils extends Logging {
     new NextIterator[InternalRow] {
       private[this] val rs = resultSet
       private[this] val getters: Array[JDBCValueGetter] = makeGetters(schema)
-      private[this] val mutableRow = new SpecificMutableRow(schema.fields.map(x => x.dataType))
+      private[this] val mutableRow = new SpecificInternalRow(schema.fields.map(x => x.dataType))
 
       override protected def close(): Unit = {
         try {
@@ -314,22 +314,22 @@ object JdbcUtils extends Logging {
   // A `JDBCValueGetter` is responsible for getting a value from `ResultSet` into a field
   // for `MutableRow`. The last argument `Int` means the index for the value to be set in
   // the row and also used for the value in `ResultSet`.
-  private type JDBCValueGetter = (ResultSet, MutableRow, Int) => Unit
+  private type JDBCValueGetter = (ResultSet, InternalRow, Int) => Unit
 
   /**
    * Creates `JDBCValueGetter`s according to [[StructType]], which can set
-   * each value from `ResultSet` to each field of [[MutableRow]] correctly.
+   * each value from `ResultSet` to each field of [[InternalRow]] correctly.
    */
   private def makeGetters(schema: StructType): Array[JDBCValueGetter] =
     schema.fields.map(sf => makeGetter(sf.dataType, sf.metadata))
 
   private def makeGetter(dt: DataType, metadata: Metadata): JDBCValueGetter = dt match {
     case BooleanType =>
-      (rs: ResultSet, row: MutableRow, pos: Int) =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
         row.setBoolean(pos, rs.getBoolean(pos + 1))
 
     case DateType =>
-      (rs: ResultSet, row: MutableRow, pos: Int) =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
         // DateTimeUtils.fromJavaDate does not handle null value, so we need to check it.
         val dateVal = rs.getDate(pos + 1)
         if (dateVal != null) {
@@ -347,25 +347,25 @@ object JdbcUtils extends Logging {
     // retrieve it, you will get wrong result 199.99.
     // So it is needed to set precision and scale for Decimal based on JDBC metadata.
     case DecimalType.Fixed(p, s) =>
-      (rs: ResultSet, row: MutableRow, pos: Int) =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
         val decimal =
           nullSafeConvert[java.math.BigDecimal](rs.getBigDecimal(pos + 1), d => Decimal(d, p, s))
         row.update(pos, decimal)
 
     case DoubleType =>
-      (rs: ResultSet, row: MutableRow, pos: Int) =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
         row.setDouble(pos, rs.getDouble(pos + 1))
 
     case FloatType =>
-      (rs: ResultSet, row: MutableRow, pos: Int) =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
         row.setFloat(pos, rs.getFloat(pos + 1))
 
     case IntegerType =>
-      (rs: ResultSet, row: MutableRow, pos: Int) =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
         row.setInt(pos, rs.getInt(pos + 1))
 
     case LongType if metadata.contains("binarylong") =>
-      (rs: ResultSet, row: MutableRow, pos: Int) =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
         val bytes = rs.getBytes(pos + 1)
         var ans = 0L
         var j = 0
@@ -376,20 +376,20 @@ object JdbcUtils extends Logging {
         row.setLong(pos, ans)
 
     case LongType =>
-      (rs: ResultSet, row: MutableRow, pos: Int) =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
         row.setLong(pos, rs.getLong(pos + 1))
 
     case ShortType =>
-      (rs: ResultSet, row: MutableRow, pos: Int) =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
         row.setShort(pos, rs.getShort(pos + 1))
 
     case StringType =>
-      (rs: ResultSet, row: MutableRow, pos: Int) =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
         // TODO(davies): use getBytes for better performance, if the encoding is UTF-8
         row.update(pos, UTF8String.fromString(rs.getString(pos + 1)))
 
     case TimestampType =>
-      (rs: ResultSet, row: MutableRow, pos: Int) =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
         val t = rs.getTimestamp(pos + 1)
         if (t != null) {
           row.setLong(pos, DateTimeUtils.fromJavaTimestamp(t))
@@ -398,7 +398,7 @@ object JdbcUtils extends Logging {
         }
 
     case BinaryType =>
-      (rs: ResultSet, row: MutableRow, pos: Int) =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
         row.update(pos, rs.getBytes(pos + 1))
 
     case ArrayType(et, _) =>
@@ -437,7 +437,7 @@ object JdbcUtils extends Logging {
         case _ => (array: Object) => array.asInstanceOf[Array[Any]]
       }
 
-      (rs: ResultSet, row: MutableRow, pos: Int) =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
         val array = nullSafeConvert[Object](
           rs.getArray(pos + 1).getArray,
           array => new GenericArrayData(elementConversion.apply(array)))

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index 9ffc2b5..33dcf2f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -40,7 +40,7 @@ import org.apache.spark.unsafe.types.UTF8String
 /**
  * A [[ParentContainerUpdater]] is used by a Parquet converter to set converted values to some
  * corresponding parent container. For example, a converter for a `StructType` field may set
- * converted values to a [[MutableRow]]; or a converter for array elements may append converted
+ * converted values to a [[InternalRow]]; or a converter for array elements may append converted
  * values to an [[ArrayBuffer]].
  */
 private[parquet] trait ParentContainerUpdater {
@@ -155,7 +155,7 @@ private[parquet] class ParquetRowConverter(
    * Updater used together with field converters within a [[ParquetRowConverter]].  It propagates
    * converted filed values to the `ordinal`-th cell in `currentRow`.
    */
-  private final class RowUpdater(row: MutableRow, ordinal: Int) extends ParentContainerUpdater {
+  private final class RowUpdater(row: InternalRow, ordinal: Int) extends ParentContainerUpdater {
     override def set(value: Any): Unit = row(ordinal) = value
     override def setBoolean(value: Boolean): Unit = row.setBoolean(ordinal, value)
     override def setByte(value: Byte): Unit = row.setByte(ordinal, value)
@@ -166,7 +166,7 @@ private[parquet] class ParquetRowConverter(
     override def setFloat(value: Float): Unit = row.setFloat(ordinal, value)
   }
 
-  private val currentRow = new SpecificMutableRow(catalystType.map(_.dataType))
+  private val currentRow = new SpecificInternalRow(catalystType.map(_.dataType))
 
   private val unsafeProjection = UnsafeProjection.create(catalystType)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
index 43cdce7..bfe7e3d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
@@ -119,7 +119,7 @@ case class BroadcastNestedLoopJoinExec(
     streamed.execute().mapPartitionsInternal { streamedIter =>
       val buildRows = relation.value
       val joinedRow = new JoinedRow
-      val nulls = new GenericMutableRow(broadcast.output.size)
+      val nulls = new GenericInternalRow(broadcast.output.size)
 
       // Returns an iterator to avoid copy the rows.
       new Iterator[InternalRow] {
@@ -205,14 +205,14 @@ case class BroadcastNestedLoopJoinExec(
       val joinedRow = new JoinedRow
 
       if (condition.isDefined) {
-        val resultRow = new GenericMutableRow(Array[Any](null))
+        val resultRow = new GenericInternalRow(Array[Any](null))
         streamedIter.map { row =>
           val result = buildRows.exists(r => boundCondition(joinedRow(row, r)))
           resultRow.setBoolean(0, result)
           joinedRow(row, resultRow)
         }
       } else {
-        val resultRow = new GenericMutableRow(Array[Any](buildRows.nonEmpty))
+        val resultRow = new GenericInternalRow(Array[Any](buildRows.nonEmpty))
         streamedIter.map { row =>
           joinedRow(row, resultRow)
         }
@@ -293,7 +293,7 @@ case class BroadcastNestedLoopJoinExec(
     }
 
     val notMatchedBroadcastRows: Seq[InternalRow] = {
-      val nulls = new GenericMutableRow(streamed.output.size)
+      val nulls = new GenericInternalRow(streamed.output.size)
       val buf: CompactBuffer[InternalRow] = new CompactBuffer()
       val joinedRow = new JoinedRow
       joinedRow.withLeft(nulls)
@@ -311,7 +311,7 @@ case class BroadcastNestedLoopJoinExec(
     val matchedStreamRows = streamRdd.mapPartitionsInternal { streamedIter =>
       val buildRows = relation.value
       val joinedRow = new JoinedRow
-      val nulls = new GenericMutableRow(broadcast.output.size)
+      val nulls = new GenericInternalRow(broadcast.output.size)
 
       streamedIter.flatMap { streamedRow =>
         var i = 0

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index fb6bfa7..8ddac19 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -192,7 +192,7 @@ trait HashJoin {
       streamIter: Iterator[InternalRow],
       hashedRelation: HashedRelation): Iterator[InternalRow] = {
     val joinKeys = streamSideKeyGenerator()
-    val result = new GenericMutableRow(Array[Any](null))
+    val result = new GenericInternalRow(Array[Any](null))
     val joinedRow = new JoinedRow
     streamIter.map { current =>
       val key = joinKeys(current)

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
index 81b3e1d..ecf7cf2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
@@ -275,7 +275,7 @@ case class SortMergeJoinExec(
         case j: ExistenceJoin =>
           new RowIterator {
             private[this] var currentLeftRow: InternalRow = _
-            private[this] val result: MutableRow = new GenericMutableRow(Array[Any](null))
+            private[this] val result: InternalRow = new GenericInternalRow(Array[Any](null))
             private[this] val smjScanner = new SortMergeJoinScanner(
               createLeftKeyGenerator(),
               createRightKeyGenerator(),

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
index c7e2671..2acc511 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
@@ -141,7 +141,7 @@ object ObjectOperator {
   def serializeObjectToRow(serializer: Seq[Expression]): Any => UnsafeRow = {
     val proj = GenerateUnsafeProjection.generate(serializer)
     val objType = serializer.head.collect { case b: BoundReference => b.dataType }.head
-    val objRow = new SpecificMutableRow(objType :: Nil)
+    val objRow = new SpecificInternalRow(objType :: Nil)
     (o: Any) => {
       objRow(0) = o
       proj(objRow)
@@ -149,7 +149,7 @@ object ObjectOperator {
   }
 
   def wrapObjectToRow(objType: DataType): Any => InternalRow = {
-    val outputRow = new SpecificMutableRow(objType :: Nil)
+    val outputRow = new SpecificInternalRow(objType :: Nil)
     (o: Any) => {
       outputRow(0) = o
       outputRow

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
index f9d20ad..dcaf2c7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
@@ -147,7 +147,7 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi
         .compute(inputIterator, context.partitionId(), context)
 
       val unpickle = new Unpickler
-      val mutableRow = new GenericMutableRow(1)
+      val mutableRow = new GenericInternalRow(1)
       val joined = new JoinedRow
       val resultType = if (udfs.length == 1) {
         udfs.head.dataType

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
index 822f49e..c02b154 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.stat
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{Column, DataFrame, Dataset, Row}
-import org.apache.spark.sql.catalyst.expressions.{Cast, GenericMutableRow}
+import org.apache.spark.sql.catalyst.expressions.{Cast, GenericInternalRow}
 import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
 import org.apache.spark.sql.catalyst.util.QuantileSummaries
 import org.apache.spark.sql.functions._
@@ -186,7 +186,7 @@ object StatFunctions extends Logging {
     require(columnSize < 1e4, s"The number of distinct values for $col2, can't " +
       s"exceed 1e4. Currently $columnSize")
     val table = counts.groupBy(_.get(0)).map { case (col1Item, rows) =>
-      val countsRow = new GenericMutableRow(columnSize + 1)
+      val countsRow = new GenericInternalRow(columnSize + 1)
       rows.foreach { (row: Row) =>
         // row.get(0) is column 1
         // row.get(1) is column 2

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala
index d3a46d0..c9f5d3b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala
@@ -123,7 +123,7 @@ private[window] final class AggregateProcessor(
 
   private[this] val join = new JoinedRow
   private[this] val numImperatives = imperatives.length
-  private[this] val buffer = new SpecificMutableRow(bufferSchema.toSeq.map(_.dataType))
+  private[this] val buffer = new SpecificInternalRow(bufferSchema.toSeq.map(_.dataType))
   initialProjection.target(buffer)
   updateProjection.target(buffer)
 
@@ -154,6 +154,6 @@ private[window] final class AggregateProcessor(
   }
 
   /** Evaluate buffer. */
-  def evaluate(target: MutableRow): Unit =
+  def evaluate(target: InternalRow): Unit =
   evaluateProjection.target(target)(buffer)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
index 7a6a30f..1dd281e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
@@ -204,7 +204,7 @@ case class WindowExec(
         val factory = key match {
           // Offset Frame
           case ("OFFSET", RowFrame, Some(offset), Some(h)) if offset == h =>
-            target: MutableRow =>
+            target: InternalRow =>
               new OffsetWindowFunctionFrame(
                 target,
                 ordinal,
@@ -217,7 +217,7 @@ case class WindowExec(
 
           // Growing Frame.
           case ("AGGREGATE", frameType, None, Some(high)) =>
-            target: MutableRow => {
+            target: InternalRow => {
               new UnboundedPrecedingWindowFunctionFrame(
                 target,
                 processor,
@@ -226,7 +226,7 @@ case class WindowExec(
 
           // Shrinking Frame.
           case ("AGGREGATE", frameType, Some(low), None) =>
-            target: MutableRow => {
+            target: InternalRow => {
               new UnboundedFollowingWindowFunctionFrame(
                 target,
                 processor,
@@ -235,7 +235,7 @@ case class WindowExec(
 
           // Moving Frame.
           case ("AGGREGATE", frameType, Some(low), Some(high)) =>
-            target: MutableRow => {
+            target: InternalRow => {
               new SlidingWindowFunctionFrame(
                 target,
                 processor,
@@ -245,7 +245,7 @@ case class WindowExec(
 
           // Entire Partition Frame.
           case ("AGGREGATE", frameType, None, None) =>
-            target: MutableRow => {
+            target: InternalRow => {
               new UnboundedWindowFunctionFrame(target, processor)
             }
         }
@@ -312,7 +312,7 @@ case class WindowExec(
         val inputFields = child.output.length
         var sorter: UnsafeExternalSorter = null
         var rowBuffer: RowBuffer = null
-        val windowFunctionResult = new SpecificMutableRow(expressions.map(_.dataType))
+        val windowFunctionResult = new SpecificInternalRow(expressions.map(_.dataType))
         val frames = factories.map(_(windowFunctionResult))
         val numFrames = frames.length
         private[this] def fetchNextPartition() {

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
index 2ab9faa..70efc0f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
@@ -56,7 +56,7 @@ private[window] abstract class WindowFunctionFrame {
  * @param offset by which rows get moved within a partition.
  */
 private[window] final class OffsetWindowFunctionFrame(
-    target: MutableRow,
+    target: InternalRow,
     ordinal: Int,
     expressions: Array[OffsetWindowFunction],
     inputSchema: Seq[Attribute],
@@ -136,7 +136,7 @@ private[window] final class OffsetWindowFunctionFrame(
  * @param ubound comparator used to identify the upper bound of an output row.
  */
 private[window] final class SlidingWindowFunctionFrame(
-    target: MutableRow,
+    target: InternalRow,
     processor: AggregateProcessor,
     lbound: BoundOrdering,
     ubound: BoundOrdering)
@@ -217,7 +217,7 @@ private[window] final class SlidingWindowFunctionFrame(
  * @param processor to calculate the row values with.
  */
 private[window] final class UnboundedWindowFunctionFrame(
-    target: MutableRow,
+    target: InternalRow,
     processor: AggregateProcessor)
   extends WindowFunctionFrame {
 
@@ -255,7 +255,7 @@ private[window] final class UnboundedWindowFunctionFrame(
  * @param ubound comparator used to identify the upper bound of an output row.
  */
 private[window] final class UnboundedPrecedingWindowFunctionFrame(
-    target: MutableRow,
+    target: InternalRow,
     processor: AggregateProcessor,
     ubound: BoundOrdering)
   extends WindowFunctionFrame {
@@ -317,7 +317,7 @@ private[window] final class UnboundedPrecedingWindowFunctionFrame(
  * @param lbound comparator used to identify the lower bound of an output row.
  */
 private[window] final class UnboundedFollowingWindowFunctionFrame(
-    target: MutableRow,
+    target: InternalRow,
     processor: AggregateProcessor,
     lbound: BoundOrdering)
   extends WindowFunctionFrame {

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala
index 34936b3..7516be3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, SpecificMutableRow}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, SpecificInternalRow}
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
@@ -27,7 +27,7 @@ class RowSuite extends SparkFunSuite with SharedSQLContext {
   import testImplicits._
 
   test("create row") {
-    val expected = new GenericMutableRow(4)
+    val expected = new GenericInternalRow(4)
     expected.setInt(0, 2147483647)
     expected.update(1, UTF8String.fromString("this is a string"))
     expected.setBoolean(2, false)
@@ -49,7 +49,7 @@ class RowSuite extends SparkFunSuite with SharedSQLContext {
   }
 
   test("SpecificMutableRow.update with null") {
-    val row = new SpecificMutableRow(Seq(IntegerType))
+    val row = new SpecificInternalRow(Seq(IntegerType))
     row(0) = null
     assert(row.isNullAt(0))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/TypedImperativeAggregateSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TypedImperativeAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/TypedImperativeAggregateSuite.scala
index b5eb16b..ffa26f1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/TypedImperativeAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/TypedImperativeAggregateSuite.scala
@@ -21,7 +21,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, Da
 
 import org.apache.spark.sql.TypedImperativeAggregateSuite.TypedMax
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{BoundReference, Expression, GenericMutableRow, SpecificMutableRow}
+import org.apache.spark.sql.catalyst.expressions.{BoundReference, Expression, GenericInternalRow, SpecificInternalRow}
 import org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate
 import org.apache.spark.sql.execution.aggregate.SortAggregateExec
 import org.apache.spark.sql.expressions.Window
@@ -64,7 +64,7 @@ class TypedImperativeAggregateSuite extends QueryTest with SharedSQLContext {
     assert(agg.eval(mergeBuffer) == data.map(_._1).max)
 
     // Tests low level eval(row: InternalRow) API.
-    val row = new GenericMutableRow(Array(mergeBuffer): Array[Any])
+    val row = new GenericInternalRow(Array(mergeBuffer): Array[Any])
 
     // Evaluates directly on row consist of aggregation buffer object.
     assert(agg.eval(row) == data.map(_._1).max)
@@ -73,7 +73,7 @@ class TypedImperativeAggregateSuite extends QueryTest with SharedSQLContext {
   test("supports SpecificMutableRow as mutable row") {
     val aggregationBufferSchema = Seq(IntegerType, LongType, BinaryType, IntegerType)
     val aggBufferOffset = 2
-    val buffer = new SpecificMutableRow(aggregationBufferSchema)
+    val buffer = new SpecificInternalRow(aggregationBufferSchema)
     val agg = new TypedMax(BoundReference(ordinal = 1, dataType = IntegerType, nullable = false))
       .withNewMutableAggBufferOffset(aggBufferOffset)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala
index 805b566..8bf9f52 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.SparkFunSuite
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.CatalystTypeConverters
-import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection}
 import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._
 import org.apache.spark.sql.types._
 
@@ -54,7 +54,7 @@ class ColumnTypeSuite extends SparkFunSuite with Logging {
         expected: Int): Unit = {
 
       assertResult(expected, s"Wrong actualSize for $columnType") {
-        val row = new GenericMutableRow(1)
+        val row = new GenericInternalRow(1)
         row.update(0, CatalystTypeConverters.convertToCatalyst(value))
         val proj = UnsafeProjection.create(Array[DataType](columnType.dataType))
         columnType.actualSize(proj(row), 0)

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala
index 1529313..686c8fa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala
@@ -21,14 +21,14 @@ import scala.collection.immutable.HashSet
 import scala.util.Random
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, GenericMutableRow}
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData}
 import org.apache.spark.sql.types.{AtomicType, Decimal}
 import org.apache.spark.unsafe.types.UTF8String
 
 object ColumnarTestUtils {
-  def makeNullRow(length: Int): GenericMutableRow = {
-    val row = new GenericMutableRow(length)
+  def makeNullRow(length: Int): GenericInternalRow = {
+    val row = new GenericInternalRow(length)
     (0 until length).foreach(row.setNullAt)
     row
   }
@@ -86,7 +86,7 @@ object ColumnarTestUtils {
       tail: ColumnType[_]*): InternalRow = makeRandomRow(Seq(head) ++ tail)
 
   def makeRandomRow(columnTypes: Seq[ColumnType[_]]): InternalRow = {
-    val row = new GenericMutableRow(columnTypes.length)
+    val row = new GenericInternalRow(columnTypes.length)
     makeRandomValues(columnTypes).zipWithIndex.foreach { case (value, index) =>
       row(index) = value
     }
@@ -95,11 +95,11 @@ object ColumnarTestUtils {
 
   def makeUniqueValuesAndSingleValueRows[T <: AtomicType](
       columnType: NativeColumnType[T],
-      count: Int): (Seq[T#InternalType], Seq[GenericMutableRow]) = {
+      count: Int): (Seq[T#InternalType], Seq[GenericInternalRow]) = {
 
     val values = makeUniqueRandomValues(columnType, count)
     val rows = values.map { value =>
-      val row = new GenericMutableRow(1)
+      val row = new GenericInternalRow(1)
       row(0) = value
       row
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessorSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessorSuite.scala
index dc22d3e..8f4ca3c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessorSuite.scala
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.CatalystTypeConverters
-import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection}
 import org.apache.spark.sql.types._
 
 class TestNullableColumnAccessor[JvmType](
@@ -72,7 +72,7 @@ class NullableColumnAccessorSuite extends SparkFunSuite {
       }
 
       val accessor = TestNullableColumnAccessor(builder.build(), columnType)
-      val row = new GenericMutableRow(1)
+      val row = new GenericInternalRow(1)
       val converter = CatalystTypeConverters.createToScalaConverter(columnType.dataType)
 
       (0 until 4).foreach { _ =>

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilderSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilderSuite.scala
index cdd4551..b2b6e92 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilderSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilderSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.columnar
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.CatalystTypeConverters
-import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection}
 import org.apache.spark.sql.types._
 
 class TestNullableColumnBuilder[JvmType](columnType: ColumnType[JvmType])
@@ -94,7 +94,7 @@ class NullableColumnBuilderSuite extends SparkFunSuite {
       (1 to 7 by 2).foreach(assertResult(_, "Wrong null position")(buffer.getInt()))
 
       // For non-null values
-      val actual = new GenericMutableRow(new Array[Any](1))
+      val actual = new GenericInternalRow(new Array[Any](1))
       (0 until 4).foreach { _ =>
         columnType.extract(buffer, actual, 0)
         assert(converter(actual.get(0, dataType)) === converter(randomRow.get(0, dataType)),

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala
index f67e9c7..d01bf91 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.columnar.compression
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 import org.apache.spark.sql.execution.columnar.{BOOLEAN, NoopColumnStats}
 import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._
 
@@ -72,7 +72,7 @@ class BooleanBitSetSuite extends SparkFunSuite {
     buffer.rewind().position(headerSize + 4)
 
     val decoder = BooleanBitSet.decoder(buffer, BOOLEAN)
-    val mutableRow = new GenericMutableRow(1)
+    val mutableRow = new GenericInternalRow(1)
     if (values.nonEmpty) {
       values.foreach {
         assert(decoder.hasNext)

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala
index babf944..9005ec9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala
@@ -23,7 +23,7 @@ import java.nio.charset.StandardCharsets
 import org.apache.commons.lang3.RandomStringUtils
 import org.apache.commons.math3.distribution.LogNormalDistribution
 
-import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, GenericMutableRow}
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 import org.apache.spark.sql.execution.columnar.{BOOLEAN, INT, LONG, NativeColumnType, SHORT, STRING}
 import org.apache.spark.sql.types.AtomicType
 import org.apache.spark.util.Benchmark
@@ -111,7 +111,7 @@ object CompressionSchemeBenchmark extends AllCompressionSchemes {
       input.rewind()
 
       benchmark.addCase(label)({ i: Int =>
-        val rowBuf = new GenericMutableRow(1)
+        val rowBuf = new GenericInternalRow(1)
 
         for (n <- 0L until iters) {
           compressedBuf.rewind.position(4)

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala
index 830ca02..67139b1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.columnar.compression
 import java.nio.ByteBuffer
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 import org.apache.spark.sql.execution.columnar._
 import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._
 import org.apache.spark.sql.types.AtomicType
@@ -97,7 +97,7 @@ class DictionaryEncodingSuite extends SparkFunSuite {
         buffer.rewind().position(headerSize + 4)
 
         val decoder = DictionaryEncoding.decoder(buffer, columnType)
-        val mutableRow = new GenericMutableRow(1)
+        val mutableRow = new GenericInternalRow(1)
 
         if (inputSeq.nonEmpty) {
           inputSeq.foreach { i =>

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala
index a530e27..411d31f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.execution.columnar.compression
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 import org.apache.spark.sql.execution.columnar._
 import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._
 import org.apache.spark.sql.types.IntegralType
@@ -48,7 +48,7 @@ class IntegralDeltaSuite extends SparkFunSuite {
       }
 
       input.foreach { value =>
-        val row = new GenericMutableRow(1)
+        val row = new GenericInternalRow(1)
         columnType.setField(row, 0, value)
         builder.appendFrom(row, 0)
       }
@@ -95,7 +95,7 @@ class IntegralDeltaSuite extends SparkFunSuite {
       buffer.rewind().position(headerSize + 4)
 
       val decoder = scheme.decoder(buffer, columnType)
-      val mutableRow = new GenericMutableRow(1)
+      val mutableRow = new GenericInternalRow(1)
 
       if (input.nonEmpty) {
         input.foreach{

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala
index 95642e9..dffa9b3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.execution.columnar.compression
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 import org.apache.spark.sql.execution.columnar._
 import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._
 import org.apache.spark.sql.types.AtomicType
@@ -80,7 +80,7 @@ class RunLengthEncodingSuite extends SparkFunSuite {
       buffer.rewind().position(headerSize + 4)
 
       val decoder = RunLengthEncoding.decoder(buffer, columnType)
-      val mutableRow = new GenericMutableRow(1)
+      val mutableRow = new GenericInternalRow(1)
 
       if (inputSeq.nonEmpty) {
         inputSeq.foreach { i =>

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 3161a63..580eade 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -38,7 +38,7 @@ import org.apache.parquet.schema.{MessageType, MessageTypeParser}
 import org.apache.spark.SparkException
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection}
-import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
@@ -716,7 +716,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
       dataTypes.zip(constantValues).foreach { case (dt, v) =>
         val schema = StructType(StructField("pcol", dt) :: Nil)
         val vectorizedReader = new VectorizedParquetRecordReader
-        val partitionValues = new GenericMutableRow(Array(v))
+        val partitionValues = new GenericInternalRow(Array(v))
         val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0)
 
         try {

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 9dd8d9f..4c4a7d8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -24,7 +24,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
-import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow
+import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
 import org.apache.spark.sql.execution.FileSourceScanExec
 import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT, SingleElement}
 import org.apache.spark.sql.internal.SQLConf
@@ -719,7 +719,7 @@ object TestingUDT {
         .add("c", DoubleType, nullable = false)
 
     override def serialize(n: NestedStruct): Any = {
-      val row = new SpecificMutableRow(sqlType.asInstanceOf[StructType].map(_.dataType))
+      val row = new SpecificInternalRow(sqlType.asInstanceOf[StructType].map(_.dataType))
       row.setInt(0, n.a)
       row.setLong(1, n.b)
       row.setDouble(2, n.c)

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
index fe34caa..1625116 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
@@ -688,25 +688,25 @@ private[hive] trait HiveInspectors {
    * @return A function that performs in-place updating of a MutableRow.
    *         Use the overloaded ObjectInspector version for assignments.
    */
-  def unwrapperFor(field: HiveStructField): (Any, MutableRow, Int) => Unit =
+  def unwrapperFor(field: HiveStructField): (Any, InternalRow, Int) => Unit =
     field.getFieldObjectInspector match {
       case oi: BooleanObjectInspector =>
-        (value: Any, row: MutableRow, ordinal: Int) => row.setBoolean(ordinal, oi.get(value))
+        (value: Any, row: InternalRow, ordinal: Int) => row.setBoolean(ordinal, oi.get(value))
       case oi: ByteObjectInspector =>
-        (value: Any, row: MutableRow, ordinal: Int) => row.setByte(ordinal, oi.get(value))
+        (value: Any, row: InternalRow, ordinal: Int) => row.setByte(ordinal, oi.get(value))
       case oi: ShortObjectInspector =>
-        (value: Any, row: MutableRow, ordinal: Int) => row.setShort(ordinal, oi.get(value))
+        (value: Any, row: InternalRow, ordinal: Int) => row.setShort(ordinal, oi.get(value))
       case oi: IntObjectInspector =>
-        (value: Any, row: MutableRow, ordinal: Int) => row.setInt(ordinal, oi.get(value))
+        (value: Any, row: InternalRow, ordinal: Int) => row.setInt(ordinal, oi.get(value))
       case oi: LongObjectInspector =>
-        (value: Any, row: MutableRow, ordinal: Int) => row.setLong(ordinal, oi.get(value))
+        (value: Any, row: InternalRow, ordinal: Int) => row.setLong(ordinal, oi.get(value))
       case oi: FloatObjectInspector =>
-        (value: Any, row: MutableRow, ordinal: Int) => row.setFloat(ordinal, oi.get(value))
+        (value: Any, row: InternalRow, ordinal: Int) => row.setFloat(ordinal, oi.get(value))
       case oi: DoubleObjectInspector =>
-        (value: Any, row: MutableRow, ordinal: Int) => row.setDouble(ordinal, oi.get(value))
+        (value: Any, row: InternalRow, ordinal: Int) => row.setDouble(ordinal, oi.get(value))
       case oi =>
         val unwrapper = unwrapperFor(oi)
-        (value: Any, row: MutableRow, ordinal: Int) => row(ordinal) = unwrapper(value)
+        (value: Any, row: InternalRow, ordinal: Int) => row(ordinal) = unwrapper(value)
     }
 
   def wrap(a: Any, oi: ObjectInspector, dataType: DataType): AnyRef = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[3/3] spark git commit: [SPARK-17761][SQL] Remove MutableRow

Posted by hv...@apache.org.
[SPARK-17761][SQL] Remove MutableRow

## What changes were proposed in this pull request?
In practice we cannot guarantee that an `InternalRow` is immutable. This makes the `MutableRow` almost redundant. This PR folds `MutableRow` into `InternalRow`.

The code below illustrates the immutability issue with InternalRow:
```scala
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
val struct = new GenericMutableRow(1)
val row = InternalRow(struct, 1)
println(row)
scala> [[null], 1]
struct.setInt(0, 42)
println(row)
scala> [[42], 1]
```

This might be somewhat controversial, so feedback is appreciated.

## How was this patch tested?
Existing tests.

Author: Herman van Hovell <hv...@databricks.com>

Closes #15333 from hvanhovell/SPARK-17761.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/97594c29
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/97594c29
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/97594c29

Branch: refs/heads/master
Commit: 97594c29b723f372a5c4c061760015bd78d01f50
Parents: 2badb58
Author: Herman van Hovell <hv...@databricks.com>
Authored: Fri Oct 7 14:03:45 2016 -0700
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Fri Oct 7 14:03:45 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/ml/linalg/MatrixUDT.scala  |   4 +-
 .../org/apache/spark/ml/linalg/VectorUDT.scala  |   6 +-
 .../apache/spark/mllib/linalg/Matrices.scala    |   4 +-
 .../org/apache/spark/mllib/linalg/Vectors.scala |   6 +-
 .../sql/catalyst/expressions/UnsafeRow.java     |   2 +-
 .../apache/spark/sql/catalyst/InternalRow.scala |  23 +-
 .../catalyst/encoders/ExpressionEncoder.scala   |   2 +-
 .../spark/sql/catalyst/expressions/Cast.scala   |   4 +-
 .../sql/catalyst/expressions/JoinedRow.scala    |  16 +
 .../sql/catalyst/expressions/Projection.scala   |   4 +-
 .../expressions/SpecificInternalRow.scala       | 313 ++++++++++++++++++
 .../expressions/SpecificMutableRow.scala        | 314 -------------------
 .../aggregate/HyperLogLogPlusPlus.scala         |   6 +-
 .../expressions/aggregate/PivotFirst.scala      |  10 +-
 .../expressions/aggregate/collect.scala         |   6 +-
 .../expressions/aggregate/interfaces.scala      |  14 +-
 .../expressions/codegen/CodeGenerator.scala     |   3 +-
 .../codegen/GenerateMutableProjection.scala     |   8 +-
 .../codegen/GenerateSafeProjection.scala        |   8 +-
 .../sql/catalyst/expressions/package.scala      |   2 +-
 .../spark/sql/catalyst/expressions/rows.scala   |  44 +--
 .../spark/sql/catalyst/json/JacksonParser.scala |   4 +-
 .../sql/catalyst/ScalaReflectionSuite.scala     |   4 +-
 .../expressions/CodeGenerationSuite.scala       |  16 +-
 .../sql/catalyst/expressions/MapDataSuite.scala |   2 +-
 .../expressions/UnsafeRowConverterSuite.scala   |  26 +-
 .../aggregate/ApproximatePercentileSuite.scala  |   9 +-
 .../aggregate/HyperLogLogPlusPlusSuite.scala    |  13 +-
 .../sql/execution/vectorized/ColumnarBatch.java |   7 +-
 .../spark/sql/execution/ExistingRDD.scala       |   4 +-
 .../aggregate/AggregationIterator.scala         |  26 +-
 .../SortBasedAggregationIterator.scala          |   6 +-
 .../aggregate/TungstenAggregationIterator.scala |   8 +-
 .../spark/sql/execution/aggregate/udaf.scala    |  38 +--
 .../sql/execution/columnar/ColumnAccessor.scala |  13 +-
 .../sql/execution/columnar/ColumnType.scala     |  72 ++---
 .../columnar/GenerateColumnAccessor.scala       |   6 +-
 .../columnar/NullableColumnAccessor.scala       |   4 +-
 .../CompressibleColumnAccessor.scala            |   4 +-
 .../compression/CompressionScheme.scala         |   3 +-
 .../compression/compressionSchemes.scala        |  20 +-
 .../datasources/DataSourceStrategy.scala        |   2 +-
 .../execution/datasources/csv/CSVRelation.scala |   4 +-
 .../execution/datasources/jdbc/JdbcUtils.scala  |  34 +-
 .../parquet/ParquetRowConverter.scala           |   6 +-
 .../joins/BroadcastNestedLoopJoinExec.scala     |  10 +-
 .../spark/sql/execution/joins/HashJoin.scala    |   2 +-
 .../sql/execution/joins/SortMergeJoinExec.scala |   2 +-
 .../apache/spark/sql/execution/objects.scala    |   4 +-
 .../execution/python/BatchEvalPythonExec.scala  |   2 +-
 .../sql/execution/stat/StatFunctions.scala      |   4 +-
 .../execution/window/AggregateProcessor.scala   |   4 +-
 .../spark/sql/execution/window/WindowExec.scala |  12 +-
 .../execution/window/WindowFunctionFrame.scala  |  10 +-
 .../scala/org/apache/spark/sql/RowSuite.scala   |   6 +-
 .../sql/TypedImperativeAggregateSuite.scala     |   6 +-
 .../execution/columnar/ColumnTypeSuite.scala    |   4 +-
 .../execution/columnar/ColumnarTestUtils.scala  |  12 +-
 .../columnar/NullableColumnAccessorSuite.scala  |   4 +-
 .../columnar/NullableColumnBuilderSuite.scala   |   4 +-
 .../compression/BooleanBitSetSuite.scala        |   4 +-
 .../CompressionSchemeBenchmark.scala            |   4 +-
 .../compression/DictionaryEncodingSuite.scala   |   4 +-
 .../compression/IntegralDeltaSuite.scala        |   6 +-
 .../compression/RunLengthEncodingSuite.scala    |   4 +-
 .../datasources/parquet/ParquetIOSuite.scala    |   4 +-
 .../datasources/parquet/ParquetQuerySuite.scala |   4 +-
 .../apache/spark/sql/hive/HiveInspectors.scala  |  18 +-
 .../org/apache/spark/sql/hive/TableReader.scala |  38 +--
 .../hive/execution/ScriptTransformation.scala   |   2 +-
 .../org/apache/spark/sql/hive/hiveUDFs.scala    |   6 +-
 .../spark/sql/hive/orc/OrcFileFormat.scala      |   2 +-
 72 files changed, 654 insertions(+), 658 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/mllib/src/main/scala/org/apache/spark/ml/linalg/MatrixUDT.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/linalg/MatrixUDT.scala b/mllib/src/main/scala/org/apache/spark/ml/linalg/MatrixUDT.scala
index a1e5366..f4a8556 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/linalg/MatrixUDT.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/linalg/MatrixUDT.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.ml.linalg
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, UnsafeArrayData}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeArrayData}
 import org.apache.spark.sql.types._
 
 /**
@@ -46,7 +46,7 @@ private[spark] class MatrixUDT extends UserDefinedType[Matrix] {
   }
 
   override def serialize(obj: Matrix): InternalRow = {
-    val row = new GenericMutableRow(7)
+    val row = new GenericInternalRow(7)
     obj match {
       case sm: SparseMatrix =>
         row.setByte(0, 0)

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala b/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala
index 0b9b2ff..9178613 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.ml.linalg
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, UnsafeArrayData}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeArrayData}
 import org.apache.spark.sql.types._
 
 /**
@@ -42,14 +42,14 @@ private[spark] class VectorUDT extends UserDefinedType[Vector] {
   override def serialize(obj: Vector): InternalRow = {
     obj match {
       case SparseVector(size, indices, values) =>
-        val row = new GenericMutableRow(4)
+        val row = new GenericInternalRow(4)
         row.setByte(0, 0)
         row.setInt(1, size)
         row.update(2, UnsafeArrayData.fromPrimitiveArray(indices))
         row.update(3, UnsafeArrayData.fromPrimitiveArray(values))
         row
       case DenseVector(values) =>
-        val row = new GenericMutableRow(4)
+        val row = new GenericInternalRow(4)
         row.setByte(0, 1)
         row.setNullAt(1)
         row.setNullAt(2)

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
index 6642999..542a69b 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
@@ -28,7 +28,7 @@ import com.github.fommil.netlib.BLAS.{getInstance => blas}
 import org.apache.spark.annotation.Since
 import org.apache.spark.ml.{linalg => newlinalg}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, UnsafeArrayData}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeArrayData}
 import org.apache.spark.sql.types._
 
 /**
@@ -189,7 +189,7 @@ private[spark] class MatrixUDT extends UserDefinedType[Matrix] {
   }
 
   override def serialize(obj: Matrix): InternalRow = {
-    val row = new GenericMutableRow(7)
+    val row = new GenericInternalRow(7)
     obj match {
       case sm: SparseMatrix =>
         row.setByte(0, 0)

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
index 91f0658..fbd217a 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
@@ -34,7 +34,7 @@ import org.apache.spark.annotation.{AlphaComponent, Since}
 import org.apache.spark.ml.{linalg => newlinalg}
 import org.apache.spark.mllib.util.NumericParser
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, UnsafeArrayData}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeArrayData}
 import org.apache.spark.sql.types._
 
 /**
@@ -214,14 +214,14 @@ class VectorUDT extends UserDefinedType[Vector] {
   override def serialize(obj: Vector): InternalRow = {
     obj match {
       case SparseVector(size, indices, values) =>
-        val row = new GenericMutableRow(4)
+        val row = new GenericInternalRow(4)
         row.setByte(0, 0)
         row.setInt(1, size)
         row.update(2, UnsafeArrayData.fromPrimitiveArray(indices))
         row.update(3, UnsafeArrayData.fromPrimitiveArray(values))
         row
       case DenseVector(values) =>
-        val row = new GenericMutableRow(4)
+        val row = new GenericInternalRow(4)
         row.setByte(0, 1)
         row.setNullAt(1)
         row.setNullAt(2)

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index 9027652..c3f0aba 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -59,7 +59,7 @@ import static org.apache.spark.unsafe.Platform.BYTE_ARRAY_OFFSET;
  *
  * Instances of `UnsafeRow` act as pointers to row data stored in this format.
  */
-public final class UnsafeRow extends MutableRow implements Externalizable, KryoSerializable {
+public final class UnsafeRow extends InternalRow implements Externalizable, KryoSerializable {
 
   //////////////////////////////////////////////////////////////////////////////
   // Static methods

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala
index eba95c5..f498e07 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.catalyst
 
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.types.{DataType, Decimal, StructType}
 
 /**
  * An abstract class for row used internal in Spark SQL, which only contain the columns as
@@ -31,6 +31,27 @@ abstract class InternalRow extends SpecializedGetters with Serializable {
   // This is only use for test and will throw a null pointer exception if the position is null.
   def getString(ordinal: Int): String = getUTF8String(ordinal).toString
 
+  def setNullAt(i: Int): Unit
+
+  def update(i: Int, value: Any): Unit
+
+  // default implementation (slow)
+  def setBoolean(i: Int, value: Boolean): Unit = update(i, value)
+  def setByte(i: Int, value: Byte): Unit = update(i, value)
+  def setShort(i: Int, value: Short): Unit = update(i, value)
+  def setInt(i: Int, value: Int): Unit = update(i, value)
+  def setLong(i: Int, value: Long): Unit = update(i, value)
+  def setFloat(i: Int, value: Float): Unit = update(i, value)
+  def setDouble(i: Int, value: Double): Unit = update(i, value)
+
+  /**
+   * Update the decimal column at `i`.
+   *
+   * Note: In order to support update decimal with precision > 18 in UnsafeRow,
+   * CAN NOT call setNullAt() for decimal column on UnsafeRow, call setDecimal(i, null, precision).
+   */
+  def setDecimal(i: Int, value: Decimal, precision: Int) { update(i, value) }
+
   /**
    * Make a copy of the current [[InternalRow]] object.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
index b96b744..82e1a8a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
@@ -256,7 +256,7 @@ case class ExpressionEncoder[T](
   private lazy val extractProjection = GenerateUnsafeProjection.generate(serializer)
 
   @transient
-  private lazy val inputRow = new GenericMutableRow(1)
+  private lazy val inputRow = new GenericInternalRow(1)
 
   @transient
   private lazy val constructProjection = GenerateSafeProjection.generate(deserializer :: Nil)

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
index 70fff51..1314c41 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
@@ -403,7 +403,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
       case (fromField, toField) => cast(fromField.dataType, toField.dataType)
     }
     // TODO: Could be faster?
-    val newRow = new GenericMutableRow(from.fields.length)
+    val newRow = new GenericInternalRow(from.fields.length)
     buildCast[InternalRow](_, row => {
       var i = 0
       while (i < row.numFields) {
@@ -892,7 +892,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
     val fieldsCasts = from.fields.zip(to.fields).map {
       case (fromField, toField) => nullSafeCastFunction(fromField.dataType, toField.dataType, ctx)
     }
-    val rowClass = classOf[GenericMutableRow].getName
+    val rowClass = classOf[GenericInternalRow].getName
     val result = ctx.freshName("result")
     val tmpRow = ctx.freshName("tmpRow")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala
index ed894f6..7770684 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala
@@ -123,6 +123,22 @@ class JoinedRow extends InternalRow {
 
   override def anyNull: Boolean = row1.anyNull || row2.anyNull
 
+  override def setNullAt(i: Int): Unit = {
+    if (i < row1.numFields) {
+      row1.setNullAt(i)
+    } else {
+      row2.setNullAt(i - row1.numFields)
+    }
+  }
+
+  override def update(i: Int, value: Any): Unit = {
+    if (i < row1.numFields) {
+      row1.update(i, value)
+    } else {
+      row2.update(i - row1.numFields, value)
+    }
+  }
+
   override def copy(): InternalRow = {
     val copy1 = row1.copy()
     val copy2 = row2.copy()

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
index c8d1866..a81fa1c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
@@ -69,10 +69,10 @@ case class InterpretedMutableProjection(expressions: Seq[Expression]) extends Mu
   })
 
   private[this] val exprArray = expressions.toArray
-  private[this] var mutableRow: MutableRow = new GenericMutableRow(exprArray.length)
+  private[this] var mutableRow: InternalRow = new GenericInternalRow(exprArray.length)
   def currentValue: InternalRow = mutableRow
 
-  override def target(row: MutableRow): MutableProjection = {
+  override def target(row: InternalRow): MutableProjection = {
     mutableRow = row
     this
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificInternalRow.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificInternalRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificInternalRow.scala
new file mode 100644
index 0000000..74e0b46
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificInternalRow.scala
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types._
+
+/**
+ * A parent class for mutable container objects that are reused when the values are changed,
+ * resulting in less garbage.  These values are held by a [[SpecificInternalRow]].
+ *
+ * The following code was roughly used to generate these objects:
+ * {{{
+ * val types = "Int,Float,Boolean,Double,Short,Long,Byte,Any".split(",")
+ * types.map {tpe =>
+ * s"""
+ * final class Mutable$tpe extends MutableValue {
+ *   var value: $tpe = 0
+ *   def boxed = if (isNull) null else value
+ *   def update(v: Any) = value = {
+ *     isNull = false
+ *     v.asInstanceOf[$tpe]
+ *   }
+ *   def copy() = {
+ *     val newCopy = new Mutable$tpe
+ *     newCopy.isNull = isNull
+ *     newCopy.value = value
+ *     newCopy
+ *   }
+ * }"""
+ * }.foreach(println)
+ *
+ * types.map { tpe =>
+ * s"""
+ *   override def set$tpe(ordinal: Int, value: $tpe): Unit = {
+ *     val currentValue = values(ordinal).asInstanceOf[Mutable$tpe]
+ *     currentValue.isNull = false
+ *     currentValue.value = value
+ *   }
+ *
+ *   override def get$tpe(i: Int): $tpe = {
+ *     values(i).asInstanceOf[Mutable$tpe].value
+ *   }"""
+ * }.foreach(println)
+ * }}}
+ */
+abstract class MutableValue extends Serializable {
+  var isNull: Boolean = true
+  def boxed: Any
+  def update(v: Any): Unit
+  def copy(): MutableValue
+}
+
+final class MutableInt extends MutableValue {
+  var value: Int = 0
+  override def boxed: Any = if (isNull) null else value
+  override def update(v: Any): Unit = {
+    isNull = false
+    value = v.asInstanceOf[Int]
+  }
+  override def copy(): MutableInt = {
+    val newCopy = new MutableInt
+    newCopy.isNull = isNull
+    newCopy.value = value
+    newCopy
+  }
+}
+
+final class MutableFloat extends MutableValue {
+  var value: Float = 0
+  override def boxed: Any = if (isNull) null else value
+  override def update(v: Any): Unit = {
+    isNull = false
+    value = v.asInstanceOf[Float]
+  }
+  override def copy(): MutableFloat = {
+    val newCopy = new MutableFloat
+    newCopy.isNull = isNull
+    newCopy.value = value
+    newCopy
+  }
+}
+
+final class MutableBoolean extends MutableValue {
+  var value: Boolean = false
+  override def boxed: Any = if (isNull) null else value
+  override def update(v: Any): Unit = {
+    isNull = false
+    value = v.asInstanceOf[Boolean]
+  }
+  override def copy(): MutableBoolean = {
+    val newCopy = new MutableBoolean
+    newCopy.isNull = isNull
+    newCopy.value = value
+    newCopy
+  }
+}
+
+final class MutableDouble extends MutableValue {
+  var value: Double = 0
+  override def boxed: Any = if (isNull) null else value
+  override def update(v: Any): Unit = {
+    isNull = false
+    value = v.asInstanceOf[Double]
+  }
+  override def copy(): MutableDouble = {
+    val newCopy = new MutableDouble
+    newCopy.isNull = isNull
+    newCopy.value = value
+    newCopy
+  }
+}
+
+final class MutableShort extends MutableValue {
+  var value: Short = 0
+  override def boxed: Any = if (isNull) null else value
+  override def update(v: Any): Unit = value = {
+    isNull = false
+    v.asInstanceOf[Short]
+  }
+  override def copy(): MutableShort = {
+    val newCopy = new MutableShort
+    newCopy.isNull = isNull
+    newCopy.value = value
+    newCopy
+  }
+}
+
+final class MutableLong extends MutableValue {
+  var value: Long = 0
+  override def boxed: Any = if (isNull) null else value
+  override def update(v: Any): Unit = value = {
+    isNull = false
+    v.asInstanceOf[Long]
+  }
+  override def copy(): MutableLong = {
+    val newCopy = new MutableLong
+    newCopy.isNull = isNull
+    newCopy.value = value
+    newCopy
+  }
+}
+
+final class MutableByte extends MutableValue {
+  var value: Byte = 0
+  override def boxed: Any = if (isNull) null else value
+  override def update(v: Any): Unit = value = {
+    isNull = false
+    v.asInstanceOf[Byte]
+  }
+  override def copy(): MutableByte = {
+    val newCopy = new MutableByte
+    newCopy.isNull = isNull
+    newCopy.value = value
+    newCopy
+  }
+}
+
+final class MutableAny extends MutableValue {
+  var value: Any = _
+  override def boxed: Any = if (isNull) null else value
+  override def update(v: Any): Unit = {
+    isNull = false
+    value = v.asInstanceOf[Any]
+  }
+  override def copy(): MutableAny = {
+    val newCopy = new MutableAny
+    newCopy.isNull = isNull
+    newCopy.value = value
+    newCopy
+  }
+}
+
+/**
+ * A row type that holds an array specialized container objects, of type [[MutableValue]], chosen
+ * based on the dataTypes of each column.  The intent is to decrease garbage when modifying the
+ * values of primitive columns.
+ */
+final class SpecificInternalRow(val values: Array[MutableValue]) extends BaseGenericInternalRow {
+
+  def this(dataTypes: Seq[DataType]) =
+    this(
+      dataTypes.map {
+        case BooleanType => new MutableBoolean
+        case ByteType => new MutableByte
+        case ShortType => new MutableShort
+        // We use INT for DATE internally
+        case IntegerType | DateType => new MutableInt
+        // We use Long for Timestamp internally
+        case LongType | TimestampType => new MutableLong
+        case FloatType => new MutableFloat
+        case DoubleType => new MutableDouble
+        case _ => new MutableAny
+      }.toArray)
+
+  def this() = this(Seq.empty)
+
+  def this(schema: StructType) = this(schema.fields.map(_.dataType))
+
+  override def numFields: Int = values.length
+
+  override def setNullAt(i: Int): Unit = {
+    values(i).isNull = true
+  }
+
+  override def isNullAt(i: Int): Boolean = values(i).isNull
+
+  override def copy(): InternalRow = {
+    val newValues = new Array[Any](values.length)
+    var i = 0
+    while (i < values.length) {
+      newValues(i) = values(i).boxed
+      i += 1
+    }
+
+    new GenericInternalRow(newValues)
+  }
+
+  override protected def genericGet(i: Int): Any = values(i).boxed
+
+  override def update(ordinal: Int, value: Any) {
+    if (value == null) {
+      setNullAt(ordinal)
+    } else {
+      values(ordinal).update(value)
+    }
+  }
+
+  override def setInt(ordinal: Int, value: Int): Unit = {
+    val currentValue = values(ordinal).asInstanceOf[MutableInt]
+    currentValue.isNull = false
+    currentValue.value = value
+  }
+
+  override def getInt(i: Int): Int = {
+    values(i).asInstanceOf[MutableInt].value
+  }
+
+  override def setFloat(ordinal: Int, value: Float): Unit = {
+    val currentValue = values(ordinal).asInstanceOf[MutableFloat]
+    currentValue.isNull = false
+    currentValue.value = value
+  }
+
+  override def getFloat(i: Int): Float = {
+    values(i).asInstanceOf[MutableFloat].value
+  }
+
+  override def setBoolean(ordinal: Int, value: Boolean): Unit = {
+    val currentValue = values(ordinal).asInstanceOf[MutableBoolean]
+    currentValue.isNull = false
+    currentValue.value = value
+  }
+
+  override def getBoolean(i: Int): Boolean = {
+    values(i).asInstanceOf[MutableBoolean].value
+  }
+
+  override def setDouble(ordinal: Int, value: Double): Unit = {
+    val currentValue = values(ordinal).asInstanceOf[MutableDouble]
+    currentValue.isNull = false
+    currentValue.value = value
+  }
+
+  override def getDouble(i: Int): Double = {
+    values(i).asInstanceOf[MutableDouble].value
+  }
+
+  override def setShort(ordinal: Int, value: Short): Unit = {
+    val currentValue = values(ordinal).asInstanceOf[MutableShort]
+    currentValue.isNull = false
+    currentValue.value = value
+  }
+
+  override def getShort(i: Int): Short = {
+    values(i).asInstanceOf[MutableShort].value
+  }
+
+  override def setLong(ordinal: Int, value: Long): Unit = {
+    val currentValue = values(ordinal).asInstanceOf[MutableLong]
+    currentValue.isNull = false
+    currentValue.value = value
+  }
+
+  override def getLong(i: Int): Long = {
+    values(i).asInstanceOf[MutableLong].value
+  }
+
+  override def setByte(ordinal: Int, value: Byte): Unit = {
+    val currentValue = values(ordinal).asInstanceOf[MutableByte]
+    currentValue.isNull = false
+    currentValue.value = value
+  }
+
+  override def getByte(i: Int): Byte = {
+    values(i).asInstanceOf[MutableByte].value
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
deleted file mode 100644
index 61ca727..0000000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
+++ /dev/null
@@ -1,314 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.expressions
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.types._
-
-/**
- * A parent class for mutable container objects that are reused when the values are changed,
- * resulting in less garbage.  These values are held by a [[SpecificMutableRow]].
- *
- * The following code was roughly used to generate these objects:
- * {{{
- * val types = "Int,Float,Boolean,Double,Short,Long,Byte,Any".split(",")
- * types.map {tpe =>
- * s"""
- * final class Mutable$tpe extends MutableValue {
- *   var value: $tpe = 0
- *   def boxed = if (isNull) null else value
- *   def update(v: Any) = value = {
- *     isNull = false
- *     v.asInstanceOf[$tpe]
- *   }
- *   def copy() = {
- *     val newCopy = new Mutable$tpe
- *     newCopy.isNull = isNull
- *     newCopy.value = value
- *     newCopy
- *   }
- * }"""
- * }.foreach(println)
- *
- * types.map { tpe =>
- * s"""
- *   override def set$tpe(ordinal: Int, value: $tpe): Unit = {
- *     val currentValue = values(ordinal).asInstanceOf[Mutable$tpe]
- *     currentValue.isNull = false
- *     currentValue.value = value
- *   }
- *
- *   override def get$tpe(i: Int): $tpe = {
- *     values(i).asInstanceOf[Mutable$tpe].value
- *   }"""
- * }.foreach(println)
- * }}}
- */
-abstract class MutableValue extends Serializable {
-  var isNull: Boolean = true
-  def boxed: Any
-  def update(v: Any): Unit
-  def copy(): MutableValue
-}
-
-final class MutableInt extends MutableValue {
-  var value: Int = 0
-  override def boxed: Any = if (isNull) null else value
-  override def update(v: Any): Unit = {
-    isNull = false
-    value = v.asInstanceOf[Int]
-  }
-  override def copy(): MutableInt = {
-    val newCopy = new MutableInt
-    newCopy.isNull = isNull
-    newCopy.value = value
-    newCopy
-  }
-}
-
-final class MutableFloat extends MutableValue {
-  var value: Float = 0
-  override def boxed: Any = if (isNull) null else value
-  override def update(v: Any): Unit = {
-    isNull = false
-    value = v.asInstanceOf[Float]
-  }
-  override def copy(): MutableFloat = {
-    val newCopy = new MutableFloat
-    newCopy.isNull = isNull
-    newCopy.value = value
-    newCopy
-  }
-}
-
-final class MutableBoolean extends MutableValue {
-  var value: Boolean = false
-  override def boxed: Any = if (isNull) null else value
-  override def update(v: Any): Unit = {
-    isNull = false
-    value = v.asInstanceOf[Boolean]
-  }
-  override def copy(): MutableBoolean = {
-    val newCopy = new MutableBoolean
-    newCopy.isNull = isNull
-    newCopy.value = value
-    newCopy
-  }
-}
-
-final class MutableDouble extends MutableValue {
-  var value: Double = 0
-  override def boxed: Any = if (isNull) null else value
-  override def update(v: Any): Unit = {
-    isNull = false
-    value = v.asInstanceOf[Double]
-  }
-  override def copy(): MutableDouble = {
-    val newCopy = new MutableDouble
-    newCopy.isNull = isNull
-    newCopy.value = value
-    newCopy
-  }
-}
-
-final class MutableShort extends MutableValue {
-  var value: Short = 0
-  override def boxed: Any = if (isNull) null else value
-  override def update(v: Any): Unit = value = {
-    isNull = false
-    v.asInstanceOf[Short]
-  }
-  override def copy(): MutableShort = {
-    val newCopy = new MutableShort
-    newCopy.isNull = isNull
-    newCopy.value = value
-    newCopy
-  }
-}
-
-final class MutableLong extends MutableValue {
-  var value: Long = 0
-  override def boxed: Any = if (isNull) null else value
-  override def update(v: Any): Unit = value = {
-    isNull = false
-    v.asInstanceOf[Long]
-  }
-  override def copy(): MutableLong = {
-    val newCopy = new MutableLong
-    newCopy.isNull = isNull
-    newCopy.value = value
-    newCopy
-  }
-}
-
-final class MutableByte extends MutableValue {
-  var value: Byte = 0
-  override def boxed: Any = if (isNull) null else value
-  override def update(v: Any): Unit = value = {
-    isNull = false
-    v.asInstanceOf[Byte]
-  }
-  override def copy(): MutableByte = {
-    val newCopy = new MutableByte
-    newCopy.isNull = isNull
-    newCopy.value = value
-    newCopy
-  }
-}
-
-final class MutableAny extends MutableValue {
-  var value: Any = _
-  override def boxed: Any = if (isNull) null else value
-  override def update(v: Any): Unit = {
-    isNull = false
-    value = v.asInstanceOf[Any]
-  }
-  override def copy(): MutableAny = {
-    val newCopy = new MutableAny
-    newCopy.isNull = isNull
-    newCopy.value = value
-    newCopy
-  }
-}
-
-/**
- * A row type that holds an array specialized container objects, of type [[MutableValue]], chosen
- * based on the dataTypes of each column.  The intent is to decrease garbage when modifying the
- * values of primitive columns.
- */
-final class SpecificMutableRow(val values: Array[MutableValue])
-  extends MutableRow with BaseGenericInternalRow {
-
-  def this(dataTypes: Seq[DataType]) =
-    this(
-      dataTypes.map {
-        case BooleanType => new MutableBoolean
-        case ByteType => new MutableByte
-        case ShortType => new MutableShort
-        // We use INT for DATE internally
-        case IntegerType | DateType => new MutableInt
-        // We use Long for Timestamp internally
-        case LongType | TimestampType => new MutableLong
-        case FloatType => new MutableFloat
-        case DoubleType => new MutableDouble
-        case _ => new MutableAny
-      }.toArray)
-
-  def this() = this(Seq.empty)
-
-  def this(schema: StructType) = this(schema.fields.map(_.dataType))
-
-  override def numFields: Int = values.length
-
-  override def setNullAt(i: Int): Unit = {
-    values(i).isNull = true
-  }
-
-  override def isNullAt(i: Int): Boolean = values(i).isNull
-
-  override def copy(): InternalRow = {
-    val newValues = new Array[Any](values.length)
-    var i = 0
-    while (i < values.length) {
-      newValues(i) = values(i).boxed
-      i += 1
-    }
-
-    new GenericInternalRow(newValues)
-  }
-
-  override protected def genericGet(i: Int): Any = values(i).boxed
-
-  override def update(ordinal: Int, value: Any) {
-    if (value == null) {
-      setNullAt(ordinal)
-    } else {
-      values(ordinal).update(value)
-    }
-  }
-
-  override def setInt(ordinal: Int, value: Int): Unit = {
-    val currentValue = values(ordinal).asInstanceOf[MutableInt]
-    currentValue.isNull = false
-    currentValue.value = value
-  }
-
-  override def getInt(i: Int): Int = {
-    values(i).asInstanceOf[MutableInt].value
-  }
-
-  override def setFloat(ordinal: Int, value: Float): Unit = {
-    val currentValue = values(ordinal).asInstanceOf[MutableFloat]
-    currentValue.isNull = false
-    currentValue.value = value
-  }
-
-  override def getFloat(i: Int): Float = {
-    values(i).asInstanceOf[MutableFloat].value
-  }
-
-  override def setBoolean(ordinal: Int, value: Boolean): Unit = {
-    val currentValue = values(ordinal).asInstanceOf[MutableBoolean]
-    currentValue.isNull = false
-    currentValue.value = value
-  }
-
-  override def getBoolean(i: Int): Boolean = {
-    values(i).asInstanceOf[MutableBoolean].value
-  }
-
-  override def setDouble(ordinal: Int, value: Double): Unit = {
-    val currentValue = values(ordinal).asInstanceOf[MutableDouble]
-    currentValue.isNull = false
-    currentValue.value = value
-  }
-
-  override def getDouble(i: Int): Double = {
-    values(i).asInstanceOf[MutableDouble].value
-  }
-
-  override def setShort(ordinal: Int, value: Short): Unit = {
-    val currentValue = values(ordinal).asInstanceOf[MutableShort]
-    currentValue.isNull = false
-    currentValue.value = value
-  }
-
-  override def getShort(i: Int): Short = {
-    values(i).asInstanceOf[MutableShort].value
-  }
-
-  override def setLong(ordinal: Int, value: Long): Unit = {
-    val currentValue = values(ordinal).asInstanceOf[MutableLong]
-    currentValue.isNull = false
-    currentValue.value = value
-  }
-
-  override def getLong(i: Int): Long = {
-    values(i).asInstanceOf[MutableLong].value
-  }
-
-  override def setByte(ordinal: Int, value: Byte): Unit = {
-    val currentValue = values(ordinal).asInstanceOf[MutableByte]
-    currentValue.isNull = false
-    currentValue.value = value
-  }
-
-  override def getByte(i: Int): Byte = {
-    values(i).asInstanceOf[MutableByte].value
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlus.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlus.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlus.scala
index 1d218da..83c8d40 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlus.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlus.scala
@@ -155,7 +155,7 @@ case class HyperLogLogPlusPlus(
     aggBufferAttributes.map(_.newInstance())
 
   /** Fill all words with zeros. */
-  override def initialize(buffer: MutableRow): Unit = {
+  override def initialize(buffer: InternalRow): Unit = {
     var word = 0
     while (word < numWords) {
       buffer.setLong(mutableAggBufferOffset + word, 0)
@@ -168,7 +168,7 @@ case class HyperLogLogPlusPlus(
    *
    * Variable names in the HLL++ paper match variable names in the code.
    */
-  override def update(buffer: MutableRow, input: InternalRow): Unit = {
+  override def update(buffer: InternalRow, input: InternalRow): Unit = {
     val v = child.eval(input)
     if (v != null) {
       // Create the hashed value 'x'.
@@ -200,7 +200,7 @@ case class HyperLogLogPlusPlus(
    * Merge the HLL buffers by iterating through the registers in both buffers and select the
    * maximum number of leading zeros for each register.
    */
-  override def merge(buffer1: MutableRow, buffer2: InternalRow): Unit = {
+  override def merge(buffer1: InternalRow, buffer2: InternalRow): Unit = {
     var idx = 0
     var wordOffset = 0
     while (wordOffset < numWords) {

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PivotFirst.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PivotFirst.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PivotFirst.scala
index 16c03c5..0876060 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PivotFirst.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PivotFirst.scala
@@ -30,7 +30,7 @@ object PivotFirst {
 
   // Currently UnsafeRow does not support the generic update method (throws
   // UnsupportedOperationException), so we need to explicitly support each DataType.
-  private val updateFunction: PartialFunction[DataType, (MutableRow, Int, Any) => Unit] = {
+  private val updateFunction: PartialFunction[DataType, (InternalRow, Int, Any) => Unit] = {
     case DoubleType =>
       (row, offset, value) => row.setDouble(offset, value.asInstanceOf[Double])
     case IntegerType =>
@@ -89,9 +89,9 @@ case class PivotFirst(
 
   val indexSize = pivotIndex.size
 
-  private val updateRow: (MutableRow, Int, Any) => Unit = PivotFirst.updateFunction(valueDataType)
+  private val updateRow: (InternalRow, Int, Any) => Unit = PivotFirst.updateFunction(valueDataType)
 
-  override def update(mutableAggBuffer: MutableRow, inputRow: InternalRow): Unit = {
+  override def update(mutableAggBuffer: InternalRow, inputRow: InternalRow): Unit = {
     val pivotColValue = pivotColumn.eval(inputRow)
     if (pivotColValue != null) {
       // We ignore rows whose pivot column value is not in the list of pivot column values.
@@ -105,7 +105,7 @@ case class PivotFirst(
     }
   }
 
-  override def merge(mutableAggBuffer: MutableRow, inputAggBuffer: InternalRow): Unit = {
+  override def merge(mutableAggBuffer: InternalRow, inputAggBuffer: InternalRow): Unit = {
     for (i <- 0 until indexSize) {
       if (!inputAggBuffer.isNullAt(inputAggBufferOffset + i)) {
         val value = inputAggBuffer.get(inputAggBufferOffset + i, valueDataType)
@@ -114,7 +114,7 @@ case class PivotFirst(
     }
   }
 
-  override def initialize(mutableAggBuffer: MutableRow): Unit = valueDataType match {
+  override def initialize(mutableAggBuffer: InternalRow): Unit = valueDataType match {
     case d: DecimalType =>
       // Per doc of setDecimal we need to do this instead of setNullAt for DecimalType.
       for (i <- 0 until indexSize) {

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala
index 78a388d..89eb864 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala
@@ -60,11 +60,11 @@ abstract class Collect extends ImperativeAggregate {
 
   protected[this] val buffer: Growable[Any] with Iterable[Any]
 
-  override def initialize(b: MutableRow): Unit = {
+  override def initialize(b: InternalRow): Unit = {
     buffer.clear()
   }
 
-  override def update(b: MutableRow, input: InternalRow): Unit = {
+  override def update(b: InternalRow, input: InternalRow): Unit = {
     // Do not allow null values. We follow the semantics of Hive's collect_list/collect_set here.
     // See: org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMkCollectionEvaluator
     val value = child.eval(input)
@@ -73,7 +73,7 @@ abstract class Collect extends ImperativeAggregate {
     }
   }
 
-  override def merge(buffer: MutableRow, input: InternalRow): Unit = {
+  override def merge(buffer: InternalRow, input: InternalRow): Unit = {
     sys.error("Collect cannot be used in partial aggregations.")
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
index b5c0844..f3fd58b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
@@ -307,14 +307,14 @@ abstract class ImperativeAggregate extends AggregateFunction with CodegenFallbac
    *
    * Use `fieldNumber + mutableAggBufferOffset` to access fields of `mutableAggBuffer`.
    */
-  def initialize(mutableAggBuffer: MutableRow): Unit
+  def initialize(mutableAggBuffer: InternalRow): Unit
 
   /**
    * Updates its aggregation buffer, located in `mutableAggBuffer`, based on the given `inputRow`.
    *
    * Use `fieldNumber + mutableAggBufferOffset` to access fields of `mutableAggBuffer`.
    */
-  def update(mutableAggBuffer: MutableRow, inputRow: InternalRow): Unit
+  def update(mutableAggBuffer: InternalRow, inputRow: InternalRow): Unit
 
   /**
    * Combines new intermediate results from the `inputAggBuffer` with the existing intermediate
@@ -323,7 +323,7 @@ abstract class ImperativeAggregate extends AggregateFunction with CodegenFallbac
    * Use `fieldNumber + mutableAggBufferOffset` to access fields of `mutableAggBuffer`.
    * Use `fieldNumber + inputAggBufferOffset` to access fields of `inputAggBuffer`.
    */
-  def merge(mutableAggBuffer: MutableRow, inputAggBuffer: InternalRow): Unit
+  def merge(mutableAggBuffer: InternalRow, inputAggBuffer: InternalRow): Unit
 }
 
 /**
@@ -504,16 +504,16 @@ abstract class TypedImperativeAggregate[T] extends ImperativeAggregate {
   /** De-serializes the serialized format Array[Byte], and produces aggregation buffer object T */
   def deserialize(storageFormat: Array[Byte]): T
 
-  final override def initialize(buffer: MutableRow): Unit = {
+  final override def initialize(buffer: InternalRow): Unit = {
     val bufferObject = createAggregationBuffer()
     buffer.update(mutableAggBufferOffset, bufferObject)
   }
 
-  final override def update(buffer: MutableRow, input: InternalRow): Unit = {
+  final override def update(buffer: InternalRow, input: InternalRow): Unit = {
     update(getBufferObject(buffer), input)
   }
 
-  final override def merge(buffer: MutableRow, inputBuffer: InternalRow): Unit = {
+  final override def merge(buffer: InternalRow, inputBuffer: InternalRow): Unit = {
     val bufferObject = getBufferObject(buffer)
     // The inputBuffer stores serialized aggregation buffer object produced by partial aggregate
     val inputObject = deserialize(inputBuffer.getBinary(inputAggBufferOffset))
@@ -547,7 +547,7 @@ abstract class TypedImperativeAggregate[T] extends ImperativeAggregate {
    * This is only called when doing Partial or PartialMerge mode aggregation, before the framework
    * shuffle out aggregate buffers.
    */
-  final def serializeAggregateBufferInPlace(buffer: MutableRow): Unit = {
+  final def serializeAggregateBufferInPlace(buffer: InternalRow): Unit = {
     buffer(mutableAggBufferOffset) = serialize(getBufferObject(buffer))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 574943d..6cab50a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -819,7 +819,7 @@ class CodeAndComment(val body: String, val comment: collection.Map[String, Strin
  */
 abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Logging {
 
-  protected val genericMutableRowType: String = classOf[GenericMutableRow].getName
+  protected val genericMutableRowType: String = classOf[GenericInternalRow].getName
 
   /**
    * Generates a class for a given input expression.  Called when there is not cached code
@@ -889,7 +889,6 @@ object CodeGenerator extends Logging {
       classOf[UnsafeArrayData].getName,
       classOf[MapData].getName,
       classOf[UnsafeMapData].getName,
-      classOf[MutableRow].getName,
       classOf[Expression].getName
     ))
     evaluator.setExtendedClass(classOf[GeneratedClass])

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
index 13d61af..5c4b56b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
@@ -24,10 +24,10 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp
 abstract class BaseMutableProjection extends MutableProjection
 
 /**
- * Generates byte code that produces a [[MutableRow]] object that can update itself based on a new
+ * Generates byte code that produces a [[InternalRow]] object that can update itself based on a new
  * input [[InternalRow]] for a fixed set of [[Expression Expressions]].
  * It exposes a `target` method, which is used to set the row that will be updated.
- * The internal [[MutableRow]] object created internally is used only when `target` is not used.
+ * The internal [[InternalRow]] object created internally is used only when `target` is not used.
  */
 object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableProjection] {
 
@@ -102,7 +102,7 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP
       class SpecificMutableProjection extends ${classOf[BaseMutableProjection].getName} {
 
         private Object[] references;
-        private MutableRow mutableRow;
+        private InternalRow mutableRow;
         ${ctx.declareMutableStates()}
 
         public SpecificMutableProjection(Object[] references) {
@@ -113,7 +113,7 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP
 
         ${ctx.declareAddedFunctions()}
 
-        public ${classOf[BaseMutableProjection].getName} target(MutableRow row) {
+        public ${classOf[BaseMutableProjection].getName} target(InternalRow row) {
           mutableRow = row;
           return this;
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
index 1c98c9e..2773e1a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.types._
 abstract class BaseProjection extends Projection {}
 
 /**
- * Generates byte code that produces a [[MutableRow]] object (not an [[UnsafeRow]]) that can update
+ * Generates byte code that produces a [[InternalRow]] object (not an [[UnsafeRow]]) that can update
  * itself based on a new input [[InternalRow]] for a fixed set of [[Expression Expressions]].
  */
 object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection] {
@@ -164,12 +164,12 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
       class SpecificSafeProjection extends ${classOf[BaseProjection].getName} {
 
         private Object[] references;
-        private MutableRow mutableRow;
+        private InternalRow mutableRow;
         ${ctx.declareMutableStates()}
 
         public SpecificSafeProjection(Object[] references) {
           this.references = references;
-          mutableRow = (MutableRow) references[references.length - 1];
+          mutableRow = (InternalRow) references[references.length - 1];
           ${ctx.initMutableStates()}
         }
 
@@ -188,7 +188,7 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
     logDebug(s"code for ${expressions.mkString(",")}:\n${CodeFormatter.format(code)}")
 
     val c = CodeGenerator.compile(code)
-    val resultRow = new SpecificMutableRow(expressions.map(_.dataType))
+    val resultRow = new SpecificInternalRow(expressions.map(_.dataType))
     c.generate(ctx.references.toArray :+ resultRow).asInstanceOf[Projection]
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
index a6125c6..1510a47 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
@@ -81,7 +81,7 @@ package object expressions  {
     def currentValue: InternalRow
 
     /** Uses the given row to store the output of the projection. */
-    def target(row: MutableRow): MutableProjection
+    def target(row: InternalRow): MutableProjection
   }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
index 73dceb3..751b821 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
@@ -158,33 +158,6 @@ trait BaseGenericInternalRow extends InternalRow {
 }
 
 /**
- * An extended interface to [[InternalRow]] that allows the values for each column to be updated.
- * Setting a value through a primitive function implicitly marks that column as not null.
- */
-abstract class MutableRow extends InternalRow {
-  def setNullAt(i: Int): Unit
-
-  def update(i: Int, value: Any): Unit
-
-  // default implementation (slow)
-  def setBoolean(i: Int, value: Boolean): Unit = { update(i, value) }
-  def setByte(i: Int, value: Byte): Unit = { update(i, value) }
-  def setShort(i: Int, value: Short): Unit = { update(i, value) }
-  def setInt(i: Int, value: Int): Unit = { update(i, value) }
-  def setLong(i: Int, value: Long): Unit = { update(i, value) }
-  def setFloat(i: Int, value: Float): Unit = { update(i, value) }
-  def setDouble(i: Int, value: Double): Unit = { update(i, value) }
-
-  /**
-   * Update the decimal column at `i`.
-   *
-   * Note: In order to support update decimal with precision > 18 in UnsafeRow,
-   * CAN NOT call setNullAt() for decimal column on UnsafeRow, call setDecimal(i, null, precision).
-   */
-  def setDecimal(i: Int, value: Decimal, precision: Int) { update(i, value) }
-}
-
-/**
  * A row implementation that uses an array of objects as the underlying storage.  Note that, while
  * the array is not copied, and thus could technically be mutated after creation, this is not
  * allowed.
@@ -230,24 +203,9 @@ class GenericInternalRow(val values: Array[Any]) extends BaseGenericInternalRow
 
   override def numFields: Int = values.length
 
-  override def copy(): GenericInternalRow = this
-}
-
-class GenericMutableRow(values: Array[Any]) extends MutableRow with BaseGenericInternalRow {
-  /** No-arg constructor for serialization. */
-  protected def this() = this(null)
-
-  def this(size: Int) = this(new Array[Any](size))
-
-  override protected def genericGet(ordinal: Int) = values(ordinal)
-
-  override def toSeq(fieldTypes: Seq[DataType]): Seq[Any] = values
-
-  override def numFields: Int = values.length
-
   override def setNullAt(i: Int): Unit = { values(i) = null}
 
   override def update(i: Int, value: Any): Unit = { values(i) = value }
 
-  override def copy(): InternalRow = new GenericInternalRow(values.clone())
+  override def copy(): GenericInternalRow = this
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index f80e637..e476cb1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -105,7 +105,7 @@ class JacksonParser(
       }
       emptyRow
     } else {
-      val row = new GenericMutableRow(schema.length)
+      val row = new GenericInternalRow(schema.length)
       for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecord)) {
         require(schema(corruptIndex).dataType == StringType)
         row.update(corruptIndex, UTF8String.fromString(record))
@@ -363,7 +363,7 @@ class JacksonParser(
       parser: JsonParser,
       schema: StructType,
       fieldConverters: Seq[ValueConverter]): InternalRow = {
-    val row = new GenericMutableRow(schema.length)
+    val row = new GenericInternalRow(schema.length)
     while (nextUntil(parser, JsonToken.END_OBJECT)) {
       schema.getFieldIndex(parser.getCurrentName) match {
         case Some(index) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
index 85563dd..43b6afd 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
@@ -23,7 +23,7 @@ import java.sql.{Date, Timestamp}
 import scala.reflect.runtime.universe.typeOf
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.expressions.{BoundReference, Literal, SpecificMutableRow}
+import org.apache.spark.sql.catalyst.expressions.{BoundReference, Literal, SpecificInternalRow}
 import org.apache.spark.sql.catalyst.expressions.objects.NewInstance
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
@@ -94,7 +94,7 @@ object TestingUDT {
       .add("c", DoubleType, nullable = false)
 
     override def serialize(n: NestedStruct): Any = {
-      val row = new SpecificMutableRow(sqlType.asInstanceOf[StructType].map(_.dataType))
+      val row = new SpecificInternalRow(sqlType.asInstanceOf[StructType].map(_.dataType))
       row.setInt(0, n.a)
       row.setLong(1, n.b)
       row.setDouble(2, n.c)

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
index 5588b44..0cb201e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
@@ -68,7 +68,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
     val length = 5000
     val expressions = List.fill(length)(EqualTo(Literal(1), Literal(1)))
     val plan = GenerateMutableProjection.generate(expressions)
-    val actual = plan(new GenericMutableRow(length)).toSeq(expressions.map(_.dataType))
+    val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType))
     val expected = Seq.fill(length)(true)
 
     if (!checkResult(actual, expected)) {
@@ -91,7 +91,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
     val expression = CaseWhen((1 to cases).map(generateCase(_)))
 
     val plan = GenerateMutableProjection.generate(Seq(expression))
-    val input = new GenericMutableRow(Array[Any](UTF8String.fromString(s"${clauses}:${cases}")))
+    val input = new GenericInternalRow(Array[Any](UTF8String.fromString(s"${clauses}:${cases}")))
     val actual = plan(input).toSeq(Seq(expression.dataType))
 
     assert(actual(0) == cases)
@@ -101,7 +101,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
     val length = 5000
     val expressions = Seq(CreateArray(List.fill(length)(EqualTo(Literal(1), Literal(1)))))
     val plan = GenerateMutableProjection.generate(expressions)
-    val actual = plan(new GenericMutableRow(length)).toSeq(expressions.map(_.dataType))
+    val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType))
     val expected = Seq(new GenericArrayData(Seq.fill(length)(true)))
 
     if (!checkResult(actual, expected)) {
@@ -116,7 +116,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
         case (expr, i) => Seq(Literal(i), expr)
       }))
     val plan = GenerateMutableProjection.generate(expressions)
-    val actual = plan(new GenericMutableRow(length)).toSeq(expressions.map(_.dataType)).map {
+    val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType)).map {
       case m: ArrayBasedMapData => ArrayBasedMapData.toScalaMap(m)
     }
     val expected = (0 until length).map((_, true)).toMap :: Nil
@@ -130,7 +130,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
     val length = 5000
     val expressions = Seq(CreateStruct(List.fill(length)(EqualTo(Literal(1), Literal(1)))))
     val plan = GenerateMutableProjection.generate(expressions)
-    val actual = plan(new GenericMutableRow(length)).toSeq(expressions.map(_.dataType))
+    val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType))
     val expected = Seq(InternalRow(Seq.fill(length)(true): _*))
 
     if (!checkResult(actual, expected)) {
@@ -145,7 +145,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
         expr => Seq(Literal(expr.toString), expr)
       }))
     val plan = GenerateMutableProjection.generate(expressions)
-    val actual = plan(new GenericMutableRow(length)).toSeq(expressions.map(_.dataType))
+    val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType))
     val expected = Seq(InternalRow(Seq.fill(length)(true): _*))
 
     if (!checkResult(actual, expected)) {
@@ -158,7 +158,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
     val schema = StructType(Seq.fill(length)(StructField("int", IntegerType)))
     val expressions = Seq(CreateExternalRow(Seq.fill(length)(Literal(1)), schema))
     val plan = GenerateMutableProjection.generate(expressions)
-    val actual = plan(new GenericMutableRow(length)).toSeq(expressions.map(_.dataType))
+    val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType))
     val expected = Seq(Row.fromSeq(Seq.fill(length)(1)))
 
     if (!checkResult(actual, expected)) {
@@ -174,7 +174,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
         Literal.create("PST", StringType))
     }
     val plan = GenerateMutableProjection.generate(expressions)
-    val actual = plan(new GenericMutableRow(length)).toSeq(expressions.map(_.dataType))
+    val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType))
     val expected = Seq.fill(length)(
       DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2015-07-24 07:00:00")))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MapDataSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MapDataSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MapDataSuite.scala
index 0f1264c..25a675a 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MapDataSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MapDataSuite.scala
@@ -45,7 +45,7 @@ class MapDataSuite extends SparkFunSuite {
 
     // UnsafeMapData
     val unsafeConverter = UnsafeProjection.create(Array[DataType](MapType(StringType, IntegerType)))
-    val row = new GenericMutableRow(1)
+    val row = new GenericInternalRow(1)
     def toUnsafeMap(map: ArrayBasedMapData): UnsafeMapData = {
       row.update(0, map)
       val unsafeRow = unsafeConverter.apply(row)

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala
index 90790dd..cf3cbe2 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala
@@ -37,7 +37,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
     val fieldTypes: Array[DataType] = Array(LongType, LongType, IntegerType)
     val converter = UnsafeProjection.create(fieldTypes)
 
-    val row = new SpecificMutableRow(fieldTypes)
+    val row = new SpecificInternalRow(fieldTypes)
     row.setLong(0, 0)
     row.setLong(1, 1)
     row.setInt(2, 2)
@@ -75,7 +75,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
     val fieldTypes: Array[DataType] = Array(LongType, StringType, BinaryType)
     val converter = UnsafeProjection.create(fieldTypes)
 
-    val row = new SpecificMutableRow(fieldTypes)
+    val row = new SpecificInternalRow(fieldTypes)
     row.setLong(0, 0)
     row.update(1, UTF8String.fromString("Hello"))
     row.update(2, "World".getBytes(StandardCharsets.UTF_8))
@@ -94,7 +94,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
     val fieldTypes: Array[DataType] = Array(LongType, StringType, DateType, TimestampType)
     val converter = UnsafeProjection.create(fieldTypes)
 
-    val row = new SpecificMutableRow(fieldTypes)
+    val row = new SpecificInternalRow(fieldTypes)
     row.setLong(0, 0)
     row.update(1, UTF8String.fromString("Hello"))
     row.update(2, DateTimeUtils.fromJavaDate(Date.valueOf("1970-01-01")))
@@ -138,7 +138,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
     val converter = UnsafeProjection.create(fieldTypes)
 
     val rowWithAllNullColumns: InternalRow = {
-      val r = new SpecificMutableRow(fieldTypes)
+      val r = new SpecificInternalRow(fieldTypes)
       for (i <- fieldTypes.indices) {
         r.setNullAt(i)
       }
@@ -167,7 +167,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
     // columns, then the serialized row representation should be identical to what we would get by
     // creating an entirely null row via the converter
     val rowWithNoNullColumns: InternalRow = {
-      val r = new SpecificMutableRow(fieldTypes)
+      val r = new SpecificInternalRow(fieldTypes)
       r.setNullAt(0)
       r.setBoolean(1, false)
       r.setByte(2, 20)
@@ -243,11 +243,11 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
   test("NaN canonicalization") {
     val fieldTypes: Array[DataType] = Array(FloatType, DoubleType)
 
-    val row1 = new SpecificMutableRow(fieldTypes)
+    val row1 = new SpecificInternalRow(fieldTypes)
     row1.setFloat(0, java.lang.Float.intBitsToFloat(0x7f800001))
     row1.setDouble(1, java.lang.Double.longBitsToDouble(0x7ff0000000000001L))
 
-    val row2 = new SpecificMutableRow(fieldTypes)
+    val row2 = new SpecificInternalRow(fieldTypes)
     row2.setFloat(0, java.lang.Float.intBitsToFloat(0x7fffffff))
     row2.setDouble(1, java.lang.Double.longBitsToDouble(0x7fffffffffffffffL))
 
@@ -263,7 +263,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
 
     val converter = UnsafeProjection.create(fieldTypes)
 
-    val row = new GenericMutableRow(fieldTypes.length)
+    val row = new GenericInternalRow(fieldTypes.length)
     row.update(0, InternalRow(1))
     row.update(1, InternalRow(InternalRow(2L)))
 
@@ -324,7 +324,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
     )
     val converter = UnsafeProjection.create(fieldTypes)
 
-    val row = new GenericMutableRow(fieldTypes.length)
+    val row = new GenericInternalRow(fieldTypes.length)
     row.update(0, createArray(1, 2))
     row.update(1, createArray(createArray(3, 4)))
 
@@ -359,7 +359,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
     val innerMap = createMap(5, 6)(7, 8)
     val map2 = createMap(9)(innerMap)
 
-    val row = new GenericMutableRow(fieldTypes.length)
+    val row = new GenericInternalRow(fieldTypes.length)
     row.update(0, map1)
     row.update(1, map2)
 
@@ -400,7 +400,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
     )
     val converter = UnsafeProjection.create(fieldTypes)
 
-    val row = new GenericMutableRow(fieldTypes.length)
+    val row = new GenericInternalRow(fieldTypes.length)
     row.update(0, InternalRow(createArray(1)))
     row.update(1, createArray(InternalRow(2L)))
 
@@ -439,7 +439,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
     )
     val converter = UnsafeProjection.create(fieldTypes)
 
-    val row = new GenericMutableRow(fieldTypes.length)
+    val row = new GenericInternalRow(fieldTypes.length)
     row.update(0, InternalRow(createMap(1)(2)))
     row.update(1, createMap(3)(InternalRow(4L)))
 
@@ -485,7 +485,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
     )
     val converter = UnsafeProjection.create(fieldTypes)
 
-    val row = new GenericMutableRow(fieldTypes.length)
+    val row = new GenericInternalRow(fieldTypes.length)
     row.update(0, createArray(createMap(1)(2)))
     row.update(1, createMap(3)(createArray(4)))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentileSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentileSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentileSuite.scala
index 61298a1..8456e24 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentileSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentileSuite.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.analysis.{SimpleAnalyzer, UnresolvedAttribu
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, BoundReference, Cast, CreateArray, DecimalLiteral, GenericMutableRow, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, BoundReference, Cast, CreateArray, DecimalLiteral, GenericInternalRow, Literal}
 import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile.{PercentileDigest, PercentileDigestSerializer}
 import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
 import org.apache.spark.sql.catalyst.util.ArrayData
@@ -144,7 +144,8 @@ class ApproximatePercentileSuite extends SparkFunSuite {
       .withNewInputAggBufferOffset(inputAggregationBufferOffset)
       .withNewMutableAggBufferOffset(mutableAggregationBufferOffset)
 
-    val mutableAggBuffer = new GenericMutableRow(new Array[Any](mutableAggregationBufferOffset + 1))
+    val mutableAggBuffer = new GenericInternalRow(
+      new Array[Any](mutableAggregationBufferOffset + 1))
     agg.initialize(mutableAggBuffer)
     val dataCount = 10
     (1 to dataCount).foreach { data =>
@@ -154,7 +155,7 @@ class ApproximatePercentileSuite extends SparkFunSuite {
 
     // Serialize the aggregation buffer
     val serialized = mutableAggBuffer.getBinary(mutableAggregationBufferOffset)
-    val inputAggBuffer = new GenericMutableRow(Array[Any](null, serialized))
+    val inputAggBuffer = new GenericInternalRow(Array[Any](null, serialized))
 
     // Phase 2: final mode aggregation
     // Re-initialize the aggregation buffer
@@ -311,7 +312,7 @@ class ApproximatePercentileSuite extends SparkFunSuite {
   test("class ApproximatePercentile, null handling") {
     val childExpression = Cast(BoundReference(0, IntegerType, nullable = true), DoubleType)
     val agg = new ApproximatePercentile(childExpression, Literal(0.5D))
-    val buffer = new GenericMutableRow(new Array[Any](1))
+    val buffer = new GenericInternalRow(new Array[Any](1))
     agg.initialize(buffer)
     // Empty aggregation buffer
     assert(agg.eval(buffer) == null)

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlusSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlusSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlusSuite.scala
index f537422..17f6b71 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlusSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlusSuite.scala
@@ -22,28 +22,29 @@ import java.util.Random
 import scala.collection.mutable
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.expressions.{BoundReference, MutableRow, SpecificMutableRow}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{BoundReference, SpecificInternalRow}
 import org.apache.spark.sql.types.{DataType, IntegerType}
 
 class HyperLogLogPlusPlusSuite extends SparkFunSuite {
 
   /** Create a HLL++ instance and an input and output buffer. */
   def createEstimator(rsd: Double, dt: DataType = IntegerType):
-      (HyperLogLogPlusPlus, MutableRow, MutableRow) = {
-    val input = new SpecificMutableRow(Seq(dt))
+      (HyperLogLogPlusPlus, InternalRow, InternalRow) = {
+    val input = new SpecificInternalRow(Seq(dt))
     val hll = new HyperLogLogPlusPlus(new BoundReference(0, dt, true), rsd)
     val buffer = createBuffer(hll)
     (hll, input, buffer)
   }
 
-  def createBuffer(hll: HyperLogLogPlusPlus): MutableRow = {
-    val buffer = new SpecificMutableRow(hll.aggBufferAttributes.map(_.dataType))
+  def createBuffer(hll: HyperLogLogPlusPlus): InternalRow = {
+    val buffer = new SpecificInternalRow(hll.aggBufferAttributes.map(_.dataType))
     hll.initialize(buffer)
     buffer
   }
 
   /** Evaluate the estimate. It should be within 3*SD's of the given true rsd. */
-  def evaluateEstimate(hll: HyperLogLogPlusPlus, buffer: MutableRow, cardinality: Int): Unit = {
+  def evaluateEstimate(hll: HyperLogLogPlusPlus, buffer: InternalRow, cardinality: Int): Unit = {
     val estimate = hll.eval(buffer).asInstanceOf[Long].toDouble
     val error = math.abs((estimate / cardinality.toDouble) - 1.0d)
     assert(error < hll.trueRsd * 3.0d, "Error should be within 3 std. errors.")

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
index 62abc2a..a6ce4c2 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
@@ -21,8 +21,7 @@ import java.util.*;
 
 import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.expressions.GenericMutableRow;
-import org.apache.spark.sql.catalyst.expressions.MutableRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
 import org.apache.spark.sql.catalyst.util.ArrayData;
 import org.apache.spark.sql.catalyst.util.MapData;
@@ -91,7 +90,7 @@ public final class ColumnarBatch {
    * Adapter class to interop with existing components that expect internal row. A lot of
    * performance is lost with this translation.
    */
-  public static final class Row extends MutableRow {
+  public static final class Row extends InternalRow {
     protected int rowId;
     private final ColumnarBatch parent;
     private final int fixedLenRowSize;
@@ -129,7 +128,7 @@ public final class ColumnarBatch {
      * Revisit this. This is expensive. This is currently only used in test paths.
      */
     public InternalRow copy() {
-      GenericMutableRow row = new GenericMutableRow(columns.length);
+      GenericInternalRow row = new GenericInternalRow(columns.length);
       for (int i = 0; i < numFields(); i++) {
         if (isNullAt(i)) {
           row.setNullAt(i);

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 6c4248c..d3a2222 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -32,7 +32,7 @@ object RDDConversions {
   def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): RDD[InternalRow] = {
     data.mapPartitions { iterator =>
       val numColumns = outputTypes.length
-      val mutableRow = new GenericMutableRow(numColumns)
+      val mutableRow = new GenericInternalRow(numColumns)
       val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter)
       iterator.map { r =>
         var i = 0
@@ -52,7 +52,7 @@ object RDDConversions {
   def rowToRowRdd(data: RDD[Row], outputTypes: Seq[DataType]): RDD[InternalRow] = {
     data.mapPartitions { iterator =>
       val numColumns = outputTypes.length
-      val mutableRow = new GenericMutableRow(numColumns)
+      val mutableRow = new GenericInternalRow(numColumns)
       val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter)
       iterator.map { r =>
         var i = 0

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala
index f335912..7c11fdb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala
@@ -153,7 +153,7 @@ abstract class AggregationIterator(
   protected def generateProcessRow(
       expressions: Seq[AggregateExpression],
       functions: Seq[AggregateFunction],
-      inputAttributes: Seq[Attribute]): (MutableRow, InternalRow) => Unit = {
+      inputAttributes: Seq[Attribute]): (InternalRow, InternalRow) => Unit = {
     val joinedRow = new JoinedRow
     if (expressions.nonEmpty) {
       val mergeExpressions = functions.zipWithIndex.flatMap {
@@ -168,9 +168,9 @@ abstract class AggregationIterator(
         case (ae: ImperativeAggregate, i) =>
           expressions(i).mode match {
             case Partial | Complete =>
-              (buffer: MutableRow, row: InternalRow) => ae.update(buffer, row)
+              (buffer: InternalRow, row: InternalRow) => ae.update(buffer, row)
             case PartialMerge | Final =>
-              (buffer: MutableRow, row: InternalRow) => ae.merge(buffer, row)
+              (buffer: InternalRow, row: InternalRow) => ae.merge(buffer, row)
           }
       }.toArray
       // This projection is used to merge buffer values for all expression-based aggregates.
@@ -178,7 +178,7 @@ abstract class AggregationIterator(
       val updateProjection =
         newMutableProjection(mergeExpressions, aggregationBufferSchema ++ inputAttributes)
 
-      (currentBuffer: MutableRow, row: InternalRow) => {
+      (currentBuffer: InternalRow, row: InternalRow) => {
         // Process all expression-based aggregate functions.
         updateProjection.target(currentBuffer)(joinedRow(currentBuffer, row))
         // Process all imperative aggregate functions.
@@ -190,11 +190,11 @@ abstract class AggregationIterator(
       }
     } else {
       // Grouping only.
-      (currentBuffer: MutableRow, row: InternalRow) => {}
+      (currentBuffer: InternalRow, row: InternalRow) => {}
     }
   }
 
-  protected val processRow: (MutableRow, InternalRow) => Unit =
+  protected val processRow: (InternalRow, InternalRow) => Unit =
     generateProcessRow(aggregateExpressions, aggregateFunctions, inputAttributes)
 
   protected val groupingProjection: UnsafeProjection =
@@ -202,7 +202,7 @@ abstract class AggregationIterator(
   protected val groupingAttributes = groupingExpressions.map(_.toAttribute)
 
   // Initializing the function used to generate the output row.
-  protected def generateResultProjection(): (UnsafeRow, MutableRow) => UnsafeRow = {
+  protected def generateResultProjection(): (UnsafeRow, InternalRow) => UnsafeRow = {
     val joinedRow = new JoinedRow
     val modes = aggregateExpressions.map(_.mode).distinct
     val bufferAttributes = aggregateFunctions.flatMap(_.aggBufferAttributes)
@@ -211,14 +211,14 @@ abstract class AggregationIterator(
         case ae: DeclarativeAggregate => ae.evaluateExpression
         case agg: AggregateFunction => NoOp
       }
-      val aggregateResult = new SpecificMutableRow(aggregateAttributes.map(_.dataType))
+      val aggregateResult = new SpecificInternalRow(aggregateAttributes.map(_.dataType))
       val expressionAggEvalProjection = newMutableProjection(evalExpressions, bufferAttributes)
       expressionAggEvalProjection.target(aggregateResult)
 
       val resultProjection =
         UnsafeProjection.create(resultExpressions, groupingAttributes ++ aggregateAttributes)
 
-      (currentGroupingKey: UnsafeRow, currentBuffer: MutableRow) => {
+      (currentGroupingKey: UnsafeRow, currentBuffer: InternalRow) => {
         // Generate results for all expression-based aggregate functions.
         expressionAggEvalProjection(currentBuffer)
         // Generate results for all imperative aggregate functions.
@@ -244,7 +244,7 @@ abstract class AggregationIterator(
         }
       }
 
-      (currentGroupingKey: UnsafeRow, currentBuffer: MutableRow) => {
+      (currentGroupingKey: UnsafeRow, currentBuffer: InternalRow) => {
         // Serializes the generic object stored in aggregation buffer
         var i = 0
         while (i < typedImperativeAggregates.length) {
@@ -256,17 +256,17 @@ abstract class AggregationIterator(
     } else {
       // Grouping-only: we only output values based on grouping expressions.
       val resultProjection = UnsafeProjection.create(resultExpressions, groupingAttributes)
-      (currentGroupingKey: UnsafeRow, currentBuffer: MutableRow) => {
+      (currentGroupingKey: UnsafeRow, currentBuffer: InternalRow) => {
         resultProjection(currentGroupingKey)
       }
     }
   }
 
-  protected val generateOutput: (UnsafeRow, MutableRow) => UnsafeRow =
+  protected val generateOutput: (UnsafeRow, InternalRow) => UnsafeRow =
     generateResultProjection()
 
   /** Initializes buffer values for all aggregate functions. */
-  protected def initializeBuffer(buffer: MutableRow): Unit = {
+  protected def initializeBuffer(buffer: InternalRow): Unit = {
     expressionAggInitialProjection.target(buffer)(EmptyRow)
     var i = 0
     while (i < allImperativeAggregateFunctions.length) {

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
index c2b1ef0..bea2dce 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
@@ -49,11 +49,11 @@ class SortBasedAggregationIterator(
    * Creates a new aggregation buffer and initializes buffer values
    * for all aggregate functions.
    */
-  private def newBuffer: MutableRow = {
+  private def newBuffer: InternalRow = {
     val bufferSchema = aggregateFunctions.flatMap(_.aggBufferAttributes)
     val bufferRowSize: Int = bufferSchema.length
 
-    val genericMutableBuffer = new GenericMutableRow(bufferRowSize)
+    val genericMutableBuffer = new GenericInternalRow(bufferRowSize)
     val useUnsafeBuffer = bufferSchema.map(_.dataType).forall(UnsafeRow.isMutable)
 
     val buffer = if (useUnsafeBuffer) {
@@ -84,7 +84,7 @@ class SortBasedAggregationIterator(
   private[this] var sortedInputHasNewGroup: Boolean = false
 
   // The aggregation buffer used by the sort-based aggregation.
-  private[this] val sortBasedAggregationBuffer: MutableRow = newBuffer
+  private[this] val sortBasedAggregationBuffer: InternalRow = newBuffer
 
   // This safe projection is used to turn the input row into safe row. This is necessary
   // because the input row may be produced by unsafe projection in child operator and all the


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org