You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2016/10/07 21:03:53 UTC

[3/3] spark git commit: [SPARK-17761][SQL] Remove MutableRow

[SPARK-17761][SQL] Remove MutableRow

## What changes were proposed in this pull request?
In practice we cannot guarantee that an `InternalRow` is immutable. This makes the `MutableRow` almost redundant. This PR folds `MutableRow` into `InternalRow`.

The code below illustrates the immutability issue with InternalRow:
```scala
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
val struct = new GenericMutableRow(1)
val row = InternalRow(struct, 1)
println(row)
scala> [[null], 1]
struct.setInt(0, 42)
println(row)
scala> [[42], 1]
```

This might be somewhat controversial, so feedback is appreciated.

## How was this patch tested?
Existing tests.

Author: Herman van Hovell <hv...@databricks.com>

Closes #15333 from hvanhovell/SPARK-17761.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/97594c29
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/97594c29
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/97594c29

Branch: refs/heads/master
Commit: 97594c29b723f372a5c4c061760015bd78d01f50
Parents: 2badb58
Author: Herman van Hovell <hv...@databricks.com>
Authored: Fri Oct 7 14:03:45 2016 -0700
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Fri Oct 7 14:03:45 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/ml/linalg/MatrixUDT.scala  |   4 +-
 .../org/apache/spark/ml/linalg/VectorUDT.scala  |   6 +-
 .../apache/spark/mllib/linalg/Matrices.scala    |   4 +-
 .../org/apache/spark/mllib/linalg/Vectors.scala |   6 +-
 .../sql/catalyst/expressions/UnsafeRow.java     |   2 +-
 .../apache/spark/sql/catalyst/InternalRow.scala |  23 +-
 .../catalyst/encoders/ExpressionEncoder.scala   |   2 +-
 .../spark/sql/catalyst/expressions/Cast.scala   |   4 +-
 .../sql/catalyst/expressions/JoinedRow.scala    |  16 +
 .../sql/catalyst/expressions/Projection.scala   |   4 +-
 .../expressions/SpecificInternalRow.scala       | 313 ++++++++++++++++++
 .../expressions/SpecificMutableRow.scala        | 314 -------------------
 .../aggregate/HyperLogLogPlusPlus.scala         |   6 +-
 .../expressions/aggregate/PivotFirst.scala      |  10 +-
 .../expressions/aggregate/collect.scala         |   6 +-
 .../expressions/aggregate/interfaces.scala      |  14 +-
 .../expressions/codegen/CodeGenerator.scala     |   3 +-
 .../codegen/GenerateMutableProjection.scala     |   8 +-
 .../codegen/GenerateSafeProjection.scala        |   8 +-
 .../sql/catalyst/expressions/package.scala      |   2 +-
 .../spark/sql/catalyst/expressions/rows.scala   |  44 +--
 .../spark/sql/catalyst/json/JacksonParser.scala |   4 +-
 .../sql/catalyst/ScalaReflectionSuite.scala     |   4 +-
 .../expressions/CodeGenerationSuite.scala       |  16 +-
 .../sql/catalyst/expressions/MapDataSuite.scala |   2 +-
 .../expressions/UnsafeRowConverterSuite.scala   |  26 +-
 .../aggregate/ApproximatePercentileSuite.scala  |   9 +-
 .../aggregate/HyperLogLogPlusPlusSuite.scala    |  13 +-
 .../sql/execution/vectorized/ColumnarBatch.java |   7 +-
 .../spark/sql/execution/ExistingRDD.scala       |   4 +-
 .../aggregate/AggregationIterator.scala         |  26 +-
 .../SortBasedAggregationIterator.scala          |   6 +-
 .../aggregate/TungstenAggregationIterator.scala |   8 +-
 .../spark/sql/execution/aggregate/udaf.scala    |  38 +--
 .../sql/execution/columnar/ColumnAccessor.scala |  13 +-
 .../sql/execution/columnar/ColumnType.scala     |  72 ++---
 .../columnar/GenerateColumnAccessor.scala       |   6 +-
 .../columnar/NullableColumnAccessor.scala       |   4 +-
 .../CompressibleColumnAccessor.scala            |   4 +-
 .../compression/CompressionScheme.scala         |   3 +-
 .../compression/compressionSchemes.scala        |  20 +-
 .../datasources/DataSourceStrategy.scala        |   2 +-
 .../execution/datasources/csv/CSVRelation.scala |   4 +-
 .../execution/datasources/jdbc/JdbcUtils.scala  |  34 +-
 .../parquet/ParquetRowConverter.scala           |   6 +-
 .../joins/BroadcastNestedLoopJoinExec.scala     |  10 +-
 .../spark/sql/execution/joins/HashJoin.scala    |   2 +-
 .../sql/execution/joins/SortMergeJoinExec.scala |   2 +-
 .../apache/spark/sql/execution/objects.scala    |   4 +-
 .../execution/python/BatchEvalPythonExec.scala  |   2 +-
 .../sql/execution/stat/StatFunctions.scala      |   4 +-
 .../execution/window/AggregateProcessor.scala   |   4 +-
 .../spark/sql/execution/window/WindowExec.scala |  12 +-
 .../execution/window/WindowFunctionFrame.scala  |  10 +-
 .../scala/org/apache/spark/sql/RowSuite.scala   |   6 +-
 .../sql/TypedImperativeAggregateSuite.scala     |   6 +-
 .../execution/columnar/ColumnTypeSuite.scala    |   4 +-
 .../execution/columnar/ColumnarTestUtils.scala  |  12 +-
 .../columnar/NullableColumnAccessorSuite.scala  |   4 +-
 .../columnar/NullableColumnBuilderSuite.scala   |   4 +-
 .../compression/BooleanBitSetSuite.scala        |   4 +-
 .../CompressionSchemeBenchmark.scala            |   4 +-
 .../compression/DictionaryEncodingSuite.scala   |   4 +-
 .../compression/IntegralDeltaSuite.scala        |   6 +-
 .../compression/RunLengthEncodingSuite.scala    |   4 +-
 .../datasources/parquet/ParquetIOSuite.scala    |   4 +-
 .../datasources/parquet/ParquetQuerySuite.scala |   4 +-
 .../apache/spark/sql/hive/HiveInspectors.scala  |  18 +-
 .../org/apache/spark/sql/hive/TableReader.scala |  38 +--
 .../hive/execution/ScriptTransformation.scala   |   2 +-
 .../org/apache/spark/sql/hive/hiveUDFs.scala    |   6 +-
 .../spark/sql/hive/orc/OrcFileFormat.scala      |   2 +-
 72 files changed, 654 insertions(+), 658 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/mllib/src/main/scala/org/apache/spark/ml/linalg/MatrixUDT.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/linalg/MatrixUDT.scala b/mllib/src/main/scala/org/apache/spark/ml/linalg/MatrixUDT.scala
index a1e5366..f4a8556 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/linalg/MatrixUDT.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/linalg/MatrixUDT.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.ml.linalg
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, UnsafeArrayData}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeArrayData}
 import org.apache.spark.sql.types._
 
 /**
@@ -46,7 +46,7 @@ private[spark] class MatrixUDT extends UserDefinedType[Matrix] {
   }
 
   override def serialize(obj: Matrix): InternalRow = {
-    val row = new GenericMutableRow(7)
+    val row = new GenericInternalRow(7)
     obj match {
       case sm: SparseMatrix =>
         row.setByte(0, 0)

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala b/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala
index 0b9b2ff..9178613 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.ml.linalg
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, UnsafeArrayData}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeArrayData}
 import org.apache.spark.sql.types._
 
 /**
@@ -42,14 +42,14 @@ private[spark] class VectorUDT extends UserDefinedType[Vector] {
   override def serialize(obj: Vector): InternalRow = {
     obj match {
       case SparseVector(size, indices, values) =>
-        val row = new GenericMutableRow(4)
+        val row = new GenericInternalRow(4)
         row.setByte(0, 0)
         row.setInt(1, size)
         row.update(2, UnsafeArrayData.fromPrimitiveArray(indices))
         row.update(3, UnsafeArrayData.fromPrimitiveArray(values))
         row
       case DenseVector(values) =>
-        val row = new GenericMutableRow(4)
+        val row = new GenericInternalRow(4)
         row.setByte(0, 1)
         row.setNullAt(1)
         row.setNullAt(2)

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
index 6642999..542a69b 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
@@ -28,7 +28,7 @@ import com.github.fommil.netlib.BLAS.{getInstance => blas}
 import org.apache.spark.annotation.Since
 import org.apache.spark.ml.{linalg => newlinalg}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, UnsafeArrayData}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeArrayData}
 import org.apache.spark.sql.types._
 
 /**
@@ -189,7 +189,7 @@ private[spark] class MatrixUDT extends UserDefinedType[Matrix] {
   }
 
   override def serialize(obj: Matrix): InternalRow = {
-    val row = new GenericMutableRow(7)
+    val row = new GenericInternalRow(7)
     obj match {
       case sm: SparseMatrix =>
         row.setByte(0, 0)

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
index 91f0658..fbd217a 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
@@ -34,7 +34,7 @@ import org.apache.spark.annotation.{AlphaComponent, Since}
 import org.apache.spark.ml.{linalg => newlinalg}
 import org.apache.spark.mllib.util.NumericParser
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, UnsafeArrayData}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeArrayData}
 import org.apache.spark.sql.types._
 
 /**
@@ -214,14 +214,14 @@ class VectorUDT extends UserDefinedType[Vector] {
   override def serialize(obj: Vector): InternalRow = {
     obj match {
       case SparseVector(size, indices, values) =>
-        val row = new GenericMutableRow(4)
+        val row = new GenericInternalRow(4)
         row.setByte(0, 0)
         row.setInt(1, size)
         row.update(2, UnsafeArrayData.fromPrimitiveArray(indices))
         row.update(3, UnsafeArrayData.fromPrimitiveArray(values))
         row
       case DenseVector(values) =>
-        val row = new GenericMutableRow(4)
+        val row = new GenericInternalRow(4)
         row.setByte(0, 1)
         row.setNullAt(1)
         row.setNullAt(2)

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index 9027652..c3f0aba 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -59,7 +59,7 @@ import static org.apache.spark.unsafe.Platform.BYTE_ARRAY_OFFSET;
  *
  * Instances of `UnsafeRow` act as pointers to row data stored in this format.
  */
-public final class UnsafeRow extends MutableRow implements Externalizable, KryoSerializable {
+public final class UnsafeRow extends InternalRow implements Externalizable, KryoSerializable {
 
   //////////////////////////////////////////////////////////////////////////////
   // Static methods

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala
index eba95c5..f498e07 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.catalyst
 
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.types.{DataType, Decimal, StructType}
 
 /**
  * An abstract class for row used internal in Spark SQL, which only contain the columns as
@@ -31,6 +31,27 @@ abstract class InternalRow extends SpecializedGetters with Serializable {
   // This is only use for test and will throw a null pointer exception if the position is null.
   def getString(ordinal: Int): String = getUTF8String(ordinal).toString
 
+  def setNullAt(i: Int): Unit
+
+  def update(i: Int, value: Any): Unit
+
+  // default implementation (slow)
+  def setBoolean(i: Int, value: Boolean): Unit = update(i, value)
+  def setByte(i: Int, value: Byte): Unit = update(i, value)
+  def setShort(i: Int, value: Short): Unit = update(i, value)
+  def setInt(i: Int, value: Int): Unit = update(i, value)
+  def setLong(i: Int, value: Long): Unit = update(i, value)
+  def setFloat(i: Int, value: Float): Unit = update(i, value)
+  def setDouble(i: Int, value: Double): Unit = update(i, value)
+
+  /**
+   * Update the decimal column at `i`.
+   *
+   * Note: In order to support update decimal with precision > 18 in UnsafeRow,
+   * CAN NOT call setNullAt() for decimal column on UnsafeRow, call setDecimal(i, null, precision).
+   */
+  def setDecimal(i: Int, value: Decimal, precision: Int) { update(i, value) }
+
   /**
    * Make a copy of the current [[InternalRow]] object.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
index b96b744..82e1a8a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
@@ -256,7 +256,7 @@ case class ExpressionEncoder[T](
   private lazy val extractProjection = GenerateUnsafeProjection.generate(serializer)
 
   @transient
-  private lazy val inputRow = new GenericMutableRow(1)
+  private lazy val inputRow = new GenericInternalRow(1)
 
   @transient
   private lazy val constructProjection = GenerateSafeProjection.generate(deserializer :: Nil)

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
index 70fff51..1314c41 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
@@ -403,7 +403,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
       case (fromField, toField) => cast(fromField.dataType, toField.dataType)
     }
     // TODO: Could be faster?
-    val newRow = new GenericMutableRow(from.fields.length)
+    val newRow = new GenericInternalRow(from.fields.length)
     buildCast[InternalRow](_, row => {
       var i = 0
       while (i < row.numFields) {
@@ -892,7 +892,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
     val fieldsCasts = from.fields.zip(to.fields).map {
       case (fromField, toField) => nullSafeCastFunction(fromField.dataType, toField.dataType, ctx)
     }
-    val rowClass = classOf[GenericMutableRow].getName
+    val rowClass = classOf[GenericInternalRow].getName
     val result = ctx.freshName("result")
     val tmpRow = ctx.freshName("tmpRow")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala
index ed894f6..7770684 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala
@@ -123,6 +123,22 @@ class JoinedRow extends InternalRow {
 
   override def anyNull: Boolean = row1.anyNull || row2.anyNull
 
+  override def setNullAt(i: Int): Unit = {
+    if (i < row1.numFields) {
+      row1.setNullAt(i)
+    } else {
+      row2.setNullAt(i - row1.numFields)
+    }
+  }
+
+  override def update(i: Int, value: Any): Unit = {
+    if (i < row1.numFields) {
+      row1.update(i, value)
+    } else {
+      row2.update(i - row1.numFields, value)
+    }
+  }
+
   override def copy(): InternalRow = {
     val copy1 = row1.copy()
     val copy2 = row2.copy()

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
index c8d1866..a81fa1c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
@@ -69,10 +69,10 @@ case class InterpretedMutableProjection(expressions: Seq[Expression]) extends Mu
   })
 
   private[this] val exprArray = expressions.toArray
-  private[this] var mutableRow: MutableRow = new GenericMutableRow(exprArray.length)
+  private[this] var mutableRow: InternalRow = new GenericInternalRow(exprArray.length)
   def currentValue: InternalRow = mutableRow
 
-  override def target(row: MutableRow): MutableProjection = {
+  override def target(row: InternalRow): MutableProjection = {
     mutableRow = row
     this
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificInternalRow.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificInternalRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificInternalRow.scala
new file mode 100644
index 0000000..74e0b46
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificInternalRow.scala
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types._
+
+/**
+ * A parent class for mutable container objects that are reused when the values are changed,
+ * resulting in less garbage.  These values are held by a [[SpecificInternalRow]].
+ *
+ * The following code was roughly used to generate these objects:
+ * {{{
+ * val types = "Int,Float,Boolean,Double,Short,Long,Byte,Any".split(",")
+ * types.map {tpe =>
+ * s"""
+ * final class Mutable$tpe extends MutableValue {
+ *   var value: $tpe = 0
+ *   def boxed = if (isNull) null else value
+ *   def update(v: Any) = value = {
+ *     isNull = false
+ *     v.asInstanceOf[$tpe]
+ *   }
+ *   def copy() = {
+ *     val newCopy = new Mutable$tpe
+ *     newCopy.isNull = isNull
+ *     newCopy.value = value
+ *     newCopy
+ *   }
+ * }"""
+ * }.foreach(println)
+ *
+ * types.map { tpe =>
+ * s"""
+ *   override def set$tpe(ordinal: Int, value: $tpe): Unit = {
+ *     val currentValue = values(ordinal).asInstanceOf[Mutable$tpe]
+ *     currentValue.isNull = false
+ *     currentValue.value = value
+ *   }
+ *
+ *   override def get$tpe(i: Int): $tpe = {
+ *     values(i).asInstanceOf[Mutable$tpe].value
+ *   }"""
+ * }.foreach(println)
+ * }}}
+ */
+abstract class MutableValue extends Serializable {
+  var isNull: Boolean = true
+  def boxed: Any
+  def update(v: Any): Unit
+  def copy(): MutableValue
+}
+
+final class MutableInt extends MutableValue {
+  var value: Int = 0
+  override def boxed: Any = if (isNull) null else value
+  override def update(v: Any): Unit = {
+    isNull = false
+    value = v.asInstanceOf[Int]
+  }
+  override def copy(): MutableInt = {
+    val newCopy = new MutableInt
+    newCopy.isNull = isNull
+    newCopy.value = value
+    newCopy
+  }
+}
+
+final class MutableFloat extends MutableValue {
+  var value: Float = 0
+  override def boxed: Any = if (isNull) null else value
+  override def update(v: Any): Unit = {
+    isNull = false
+    value = v.asInstanceOf[Float]
+  }
+  override def copy(): MutableFloat = {
+    val newCopy = new MutableFloat
+    newCopy.isNull = isNull
+    newCopy.value = value
+    newCopy
+  }
+}
+
+final class MutableBoolean extends MutableValue {
+  var value: Boolean = false
+  override def boxed: Any = if (isNull) null else value
+  override def update(v: Any): Unit = {
+    isNull = false
+    value = v.asInstanceOf[Boolean]
+  }
+  override def copy(): MutableBoolean = {
+    val newCopy = new MutableBoolean
+    newCopy.isNull = isNull
+    newCopy.value = value
+    newCopy
+  }
+}
+
+final class MutableDouble extends MutableValue {
+  var value: Double = 0
+  override def boxed: Any = if (isNull) null else value
+  override def update(v: Any): Unit = {
+    isNull = false
+    value = v.asInstanceOf[Double]
+  }
+  override def copy(): MutableDouble = {
+    val newCopy = new MutableDouble
+    newCopy.isNull = isNull
+    newCopy.value = value
+    newCopy
+  }
+}
+
+final class MutableShort extends MutableValue {
+  var value: Short = 0
+  override def boxed: Any = if (isNull) null else value
+  override def update(v: Any): Unit = value = {
+    isNull = false
+    v.asInstanceOf[Short]
+  }
+  override def copy(): MutableShort = {
+    val newCopy = new MutableShort
+    newCopy.isNull = isNull
+    newCopy.value = value
+    newCopy
+  }
+}
+
+final class MutableLong extends MutableValue {
+  var value: Long = 0
+  override def boxed: Any = if (isNull) null else value
+  override def update(v: Any): Unit = value = {
+    isNull = false
+    v.asInstanceOf[Long]
+  }
+  override def copy(): MutableLong = {
+    val newCopy = new MutableLong
+    newCopy.isNull = isNull
+    newCopy.value = value
+    newCopy
+  }
+}
+
+final class MutableByte extends MutableValue {
+  var value: Byte = 0
+  override def boxed: Any = if (isNull) null else value
+  override def update(v: Any): Unit = value = {
+    isNull = false
+    v.asInstanceOf[Byte]
+  }
+  override def copy(): MutableByte = {
+    val newCopy = new MutableByte
+    newCopy.isNull = isNull
+    newCopy.value = value
+    newCopy
+  }
+}
+
+final class MutableAny extends MutableValue {
+  var value: Any = _
+  override def boxed: Any = if (isNull) null else value
+  override def update(v: Any): Unit = {
+    isNull = false
+    value = v.asInstanceOf[Any]
+  }
+  override def copy(): MutableAny = {
+    val newCopy = new MutableAny
+    newCopy.isNull = isNull
+    newCopy.value = value
+    newCopy
+  }
+}
+
+/**
+ * A row type that holds an array specialized container objects, of type [[MutableValue]], chosen
+ * based on the dataTypes of each column.  The intent is to decrease garbage when modifying the
+ * values of primitive columns.
+ */
+final class SpecificInternalRow(val values: Array[MutableValue]) extends BaseGenericInternalRow {
+
+  def this(dataTypes: Seq[DataType]) =
+    this(
+      dataTypes.map {
+        case BooleanType => new MutableBoolean
+        case ByteType => new MutableByte
+        case ShortType => new MutableShort
+        // We use INT for DATE internally
+        case IntegerType | DateType => new MutableInt
+        // We use Long for Timestamp internally
+        case LongType | TimestampType => new MutableLong
+        case FloatType => new MutableFloat
+        case DoubleType => new MutableDouble
+        case _ => new MutableAny
+      }.toArray)
+
+  def this() = this(Seq.empty)
+
+  def this(schema: StructType) = this(schema.fields.map(_.dataType))
+
+  override def numFields: Int = values.length
+
+  override def setNullAt(i: Int): Unit = {
+    values(i).isNull = true
+  }
+
+  override def isNullAt(i: Int): Boolean = values(i).isNull
+
+  override def copy(): InternalRow = {
+    val newValues = new Array[Any](values.length)
+    var i = 0
+    while (i < values.length) {
+      newValues(i) = values(i).boxed
+      i += 1
+    }
+
+    new GenericInternalRow(newValues)
+  }
+
+  override protected def genericGet(i: Int): Any = values(i).boxed
+
+  override def update(ordinal: Int, value: Any) {
+    if (value == null) {
+      setNullAt(ordinal)
+    } else {
+      values(ordinal).update(value)
+    }
+  }
+
+  override def setInt(ordinal: Int, value: Int): Unit = {
+    val currentValue = values(ordinal).asInstanceOf[MutableInt]
+    currentValue.isNull = false
+    currentValue.value = value
+  }
+
+  override def getInt(i: Int): Int = {
+    values(i).asInstanceOf[MutableInt].value
+  }
+
+  override def setFloat(ordinal: Int, value: Float): Unit = {
+    val currentValue = values(ordinal).asInstanceOf[MutableFloat]
+    currentValue.isNull = false
+    currentValue.value = value
+  }
+
+  override def getFloat(i: Int): Float = {
+    values(i).asInstanceOf[MutableFloat].value
+  }
+
+  override def setBoolean(ordinal: Int, value: Boolean): Unit = {
+    val currentValue = values(ordinal).asInstanceOf[MutableBoolean]
+    currentValue.isNull = false
+    currentValue.value = value
+  }
+
+  override def getBoolean(i: Int): Boolean = {
+    values(i).asInstanceOf[MutableBoolean].value
+  }
+
+  override def setDouble(ordinal: Int, value: Double): Unit = {
+    val currentValue = values(ordinal).asInstanceOf[MutableDouble]
+    currentValue.isNull = false
+    currentValue.value = value
+  }
+
+  override def getDouble(i: Int): Double = {
+    values(i).asInstanceOf[MutableDouble].value
+  }
+
+  override def setShort(ordinal: Int, value: Short): Unit = {
+    val currentValue = values(ordinal).asInstanceOf[MutableShort]
+    currentValue.isNull = false
+    currentValue.value = value
+  }
+
+  override def getShort(i: Int): Short = {
+    values(i).asInstanceOf[MutableShort].value
+  }
+
+  override def setLong(ordinal: Int, value: Long): Unit = {
+    val currentValue = values(ordinal).asInstanceOf[MutableLong]
+    currentValue.isNull = false
+    currentValue.value = value
+  }
+
+  override def getLong(i: Int): Long = {
+    values(i).asInstanceOf[MutableLong].value
+  }
+
+  override def setByte(ordinal: Int, value: Byte): Unit = {
+    val currentValue = values(ordinal).asInstanceOf[MutableByte]
+    currentValue.isNull = false
+    currentValue.value = value
+  }
+
+  override def getByte(i: Int): Byte = {
+    values(i).asInstanceOf[MutableByte].value
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
deleted file mode 100644
index 61ca727..0000000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
+++ /dev/null
@@ -1,314 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.expressions
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.types._
-
-/**
- * A parent class for mutable container objects that are reused when the values are changed,
- * resulting in less garbage.  These values are held by a [[SpecificMutableRow]].
- *
- * The following code was roughly used to generate these objects:
- * {{{
- * val types = "Int,Float,Boolean,Double,Short,Long,Byte,Any".split(",")
- * types.map {tpe =>
- * s"""
- * final class Mutable$tpe extends MutableValue {
- *   var value: $tpe = 0
- *   def boxed = if (isNull) null else value
- *   def update(v: Any) = value = {
- *     isNull = false
- *     v.asInstanceOf[$tpe]
- *   }
- *   def copy() = {
- *     val newCopy = new Mutable$tpe
- *     newCopy.isNull = isNull
- *     newCopy.value = value
- *     newCopy
- *   }
- * }"""
- * }.foreach(println)
- *
- * types.map { tpe =>
- * s"""
- *   override def set$tpe(ordinal: Int, value: $tpe): Unit = {
- *     val currentValue = values(ordinal).asInstanceOf[Mutable$tpe]
- *     currentValue.isNull = false
- *     currentValue.value = value
- *   }
- *
- *   override def get$tpe(i: Int): $tpe = {
- *     values(i).asInstanceOf[Mutable$tpe].value
- *   }"""
- * }.foreach(println)
- * }}}
- */
-abstract class MutableValue extends Serializable {
-  var isNull: Boolean = true
-  def boxed: Any
-  def update(v: Any): Unit
-  def copy(): MutableValue
-}
-
-final class MutableInt extends MutableValue {
-  var value: Int = 0
-  override def boxed: Any = if (isNull) null else value
-  override def update(v: Any): Unit = {
-    isNull = false
-    value = v.asInstanceOf[Int]
-  }
-  override def copy(): MutableInt = {
-    val newCopy = new MutableInt
-    newCopy.isNull = isNull
-    newCopy.value = value
-    newCopy
-  }
-}
-
-final class MutableFloat extends MutableValue {
-  var value: Float = 0
-  override def boxed: Any = if (isNull) null else value
-  override def update(v: Any): Unit = {
-    isNull = false
-    value = v.asInstanceOf[Float]
-  }
-  override def copy(): MutableFloat = {
-    val newCopy = new MutableFloat
-    newCopy.isNull = isNull
-    newCopy.value = value
-    newCopy
-  }
-}
-
-final class MutableBoolean extends MutableValue {
-  var value: Boolean = false
-  override def boxed: Any = if (isNull) null else value
-  override def update(v: Any): Unit = {
-    isNull = false
-    value = v.asInstanceOf[Boolean]
-  }
-  override def copy(): MutableBoolean = {
-    val newCopy = new MutableBoolean
-    newCopy.isNull = isNull
-    newCopy.value = value
-    newCopy
-  }
-}
-
-final class MutableDouble extends MutableValue {
-  var value: Double = 0
-  override def boxed: Any = if (isNull) null else value
-  override def update(v: Any): Unit = {
-    isNull = false
-    value = v.asInstanceOf[Double]
-  }
-  override def copy(): MutableDouble = {
-    val newCopy = new MutableDouble
-    newCopy.isNull = isNull
-    newCopy.value = value
-    newCopy
-  }
-}
-
-final class MutableShort extends MutableValue {
-  var value: Short = 0
-  override def boxed: Any = if (isNull) null else value
-  override def update(v: Any): Unit = value = {
-    isNull = false
-    v.asInstanceOf[Short]
-  }
-  override def copy(): MutableShort = {
-    val newCopy = new MutableShort
-    newCopy.isNull = isNull
-    newCopy.value = value
-    newCopy
-  }
-}
-
-final class MutableLong extends MutableValue {
-  var value: Long = 0
-  override def boxed: Any = if (isNull) null else value
-  override def update(v: Any): Unit = value = {
-    isNull = false
-    v.asInstanceOf[Long]
-  }
-  override def copy(): MutableLong = {
-    val newCopy = new MutableLong
-    newCopy.isNull = isNull
-    newCopy.value = value
-    newCopy
-  }
-}
-
-final class MutableByte extends MutableValue {
-  var value: Byte = 0
-  override def boxed: Any = if (isNull) null else value
-  override def update(v: Any): Unit = value = {
-    isNull = false
-    v.asInstanceOf[Byte]
-  }
-  override def copy(): MutableByte = {
-    val newCopy = new MutableByte
-    newCopy.isNull = isNull
-    newCopy.value = value
-    newCopy
-  }
-}
-
-final class MutableAny extends MutableValue {
-  var value: Any = _
-  override def boxed: Any = if (isNull) null else value
-  override def update(v: Any): Unit = {
-    isNull = false
-    value = v.asInstanceOf[Any]
-  }
-  override def copy(): MutableAny = {
-    val newCopy = new MutableAny
-    newCopy.isNull = isNull
-    newCopy.value = value
-    newCopy
-  }
-}
-
-/**
- * A row type that holds an array specialized container objects, of type [[MutableValue]], chosen
- * based on the dataTypes of each column.  The intent is to decrease garbage when modifying the
- * values of primitive columns.
- */
-final class SpecificMutableRow(val values: Array[MutableValue])
-  extends MutableRow with BaseGenericInternalRow {
-
-  def this(dataTypes: Seq[DataType]) =
-    this(
-      dataTypes.map {
-        case BooleanType => new MutableBoolean
-        case ByteType => new MutableByte
-        case ShortType => new MutableShort
-        // We use INT for DATE internally
-        case IntegerType | DateType => new MutableInt
-        // We use Long for Timestamp internally
-        case LongType | TimestampType => new MutableLong
-        case FloatType => new MutableFloat
-        case DoubleType => new MutableDouble
-        case _ => new MutableAny
-      }.toArray)
-
-  def this() = this(Seq.empty)
-
-  def this(schema: StructType) = this(schema.fields.map(_.dataType))
-
-  override def numFields: Int = values.length
-
-  override def setNullAt(i: Int): Unit = {
-    values(i).isNull = true
-  }
-
-  override def isNullAt(i: Int): Boolean = values(i).isNull
-
-  override def copy(): InternalRow = {
-    val newValues = new Array[Any](values.length)
-    var i = 0
-    while (i < values.length) {
-      newValues(i) = values(i).boxed
-      i += 1
-    }
-
-    new GenericInternalRow(newValues)
-  }
-
-  override protected def genericGet(i: Int): Any = values(i).boxed
-
-  override def update(ordinal: Int, value: Any) {
-    if (value == null) {
-      setNullAt(ordinal)
-    } else {
-      values(ordinal).update(value)
-    }
-  }
-
-  override def setInt(ordinal: Int, value: Int): Unit = {
-    val currentValue = values(ordinal).asInstanceOf[MutableInt]
-    currentValue.isNull = false
-    currentValue.value = value
-  }
-
-  override def getInt(i: Int): Int = {
-    values(i).asInstanceOf[MutableInt].value
-  }
-
-  override def setFloat(ordinal: Int, value: Float): Unit = {
-    val currentValue = values(ordinal).asInstanceOf[MutableFloat]
-    currentValue.isNull = false
-    currentValue.value = value
-  }
-
-  override def getFloat(i: Int): Float = {
-    values(i).asInstanceOf[MutableFloat].value
-  }
-
-  override def setBoolean(ordinal: Int, value: Boolean): Unit = {
-    val currentValue = values(ordinal).asInstanceOf[MutableBoolean]
-    currentValue.isNull = false
-    currentValue.value = value
-  }
-
-  override def getBoolean(i: Int): Boolean = {
-    values(i).asInstanceOf[MutableBoolean].value
-  }
-
-  override def setDouble(ordinal: Int, value: Double): Unit = {
-    val currentValue = values(ordinal).asInstanceOf[MutableDouble]
-    currentValue.isNull = false
-    currentValue.value = value
-  }
-
-  override def getDouble(i: Int): Double = {
-    values(i).asInstanceOf[MutableDouble].value
-  }
-
-  override def setShort(ordinal: Int, value: Short): Unit = {
-    val currentValue = values(ordinal).asInstanceOf[MutableShort]
-    currentValue.isNull = false
-    currentValue.value = value
-  }
-
-  override def getShort(i: Int): Short = {
-    values(i).asInstanceOf[MutableShort].value
-  }
-
-  override def setLong(ordinal: Int, value: Long): Unit = {
-    val currentValue = values(ordinal).asInstanceOf[MutableLong]
-    currentValue.isNull = false
-    currentValue.value = value
-  }
-
-  override def getLong(i: Int): Long = {
-    values(i).asInstanceOf[MutableLong].value
-  }
-
-  override def setByte(ordinal: Int, value: Byte): Unit = {
-    val currentValue = values(ordinal).asInstanceOf[MutableByte]
-    currentValue.isNull = false
-    currentValue.value = value
-  }
-
-  override def getByte(i: Int): Byte = {
-    values(i).asInstanceOf[MutableByte].value
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlus.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlus.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlus.scala
index 1d218da..83c8d40 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlus.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlus.scala
@@ -155,7 +155,7 @@ case class HyperLogLogPlusPlus(
     aggBufferAttributes.map(_.newInstance())
 
   /** Fill all words with zeros. */
-  override def initialize(buffer: MutableRow): Unit = {
+  override def initialize(buffer: InternalRow): Unit = {
     var word = 0
     while (word < numWords) {
       buffer.setLong(mutableAggBufferOffset + word, 0)
@@ -168,7 +168,7 @@ case class HyperLogLogPlusPlus(
    *
    * Variable names in the HLL++ paper match variable names in the code.
    */
-  override def update(buffer: MutableRow, input: InternalRow): Unit = {
+  override def update(buffer: InternalRow, input: InternalRow): Unit = {
     val v = child.eval(input)
     if (v != null) {
       // Create the hashed value 'x'.
@@ -200,7 +200,7 @@ case class HyperLogLogPlusPlus(
    * Merge the HLL buffers by iterating through the registers in both buffers and select the
    * maximum number of leading zeros for each register.
    */
-  override def merge(buffer1: MutableRow, buffer2: InternalRow): Unit = {
+  override def merge(buffer1: InternalRow, buffer2: InternalRow): Unit = {
     var idx = 0
     var wordOffset = 0
     while (wordOffset < numWords) {

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PivotFirst.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PivotFirst.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PivotFirst.scala
index 16c03c5..0876060 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PivotFirst.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PivotFirst.scala
@@ -30,7 +30,7 @@ object PivotFirst {
 
   // Currently UnsafeRow does not support the generic update method (throws
   // UnsupportedOperationException), so we need to explicitly support each DataType.
-  private val updateFunction: PartialFunction[DataType, (MutableRow, Int, Any) => Unit] = {
+  private val updateFunction: PartialFunction[DataType, (InternalRow, Int, Any) => Unit] = {
     case DoubleType =>
       (row, offset, value) => row.setDouble(offset, value.asInstanceOf[Double])
     case IntegerType =>
@@ -89,9 +89,9 @@ case class PivotFirst(
 
   val indexSize = pivotIndex.size
 
-  private val updateRow: (MutableRow, Int, Any) => Unit = PivotFirst.updateFunction(valueDataType)
+  private val updateRow: (InternalRow, Int, Any) => Unit = PivotFirst.updateFunction(valueDataType)
 
-  override def update(mutableAggBuffer: MutableRow, inputRow: InternalRow): Unit = {
+  override def update(mutableAggBuffer: InternalRow, inputRow: InternalRow): Unit = {
     val pivotColValue = pivotColumn.eval(inputRow)
     if (pivotColValue != null) {
       // We ignore rows whose pivot column value is not in the list of pivot column values.
@@ -105,7 +105,7 @@ case class PivotFirst(
     }
   }
 
-  override def merge(mutableAggBuffer: MutableRow, inputAggBuffer: InternalRow): Unit = {
+  override def merge(mutableAggBuffer: InternalRow, inputAggBuffer: InternalRow): Unit = {
     for (i <- 0 until indexSize) {
       if (!inputAggBuffer.isNullAt(inputAggBufferOffset + i)) {
         val value = inputAggBuffer.get(inputAggBufferOffset + i, valueDataType)
@@ -114,7 +114,7 @@ case class PivotFirst(
     }
   }
 
-  override def initialize(mutableAggBuffer: MutableRow): Unit = valueDataType match {
+  override def initialize(mutableAggBuffer: InternalRow): Unit = valueDataType match {
     case d: DecimalType =>
       // Per doc of setDecimal we need to do this instead of setNullAt for DecimalType.
       for (i <- 0 until indexSize) {

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala
index 78a388d..89eb864 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala
@@ -60,11 +60,11 @@ abstract class Collect extends ImperativeAggregate {
 
   protected[this] val buffer: Growable[Any] with Iterable[Any]
 
-  override def initialize(b: MutableRow): Unit = {
+  override def initialize(b: InternalRow): Unit = {
     buffer.clear()
   }
 
-  override def update(b: MutableRow, input: InternalRow): Unit = {
+  override def update(b: InternalRow, input: InternalRow): Unit = {
     // Do not allow null values. We follow the semantics of Hive's collect_list/collect_set here.
     // See: org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMkCollectionEvaluator
     val value = child.eval(input)
@@ -73,7 +73,7 @@ abstract class Collect extends ImperativeAggregate {
     }
   }
 
-  override def merge(buffer: MutableRow, input: InternalRow): Unit = {
+  override def merge(buffer: InternalRow, input: InternalRow): Unit = {
     sys.error("Collect cannot be used in partial aggregations.")
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
index b5c0844..f3fd58b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
@@ -307,14 +307,14 @@ abstract class ImperativeAggregate extends AggregateFunction with CodegenFallbac
    *
    * Use `fieldNumber + mutableAggBufferOffset` to access fields of `mutableAggBuffer`.
    */
-  def initialize(mutableAggBuffer: MutableRow): Unit
+  def initialize(mutableAggBuffer: InternalRow): Unit
 
   /**
    * Updates its aggregation buffer, located in `mutableAggBuffer`, based on the given `inputRow`.
    *
    * Use `fieldNumber + mutableAggBufferOffset` to access fields of `mutableAggBuffer`.
    */
-  def update(mutableAggBuffer: MutableRow, inputRow: InternalRow): Unit
+  def update(mutableAggBuffer: InternalRow, inputRow: InternalRow): Unit
 
   /**
    * Combines new intermediate results from the `inputAggBuffer` with the existing intermediate
@@ -323,7 +323,7 @@ abstract class ImperativeAggregate extends AggregateFunction with CodegenFallbac
    * Use `fieldNumber + mutableAggBufferOffset` to access fields of `mutableAggBuffer`.
    * Use `fieldNumber + inputAggBufferOffset` to access fields of `inputAggBuffer`.
    */
-  def merge(mutableAggBuffer: MutableRow, inputAggBuffer: InternalRow): Unit
+  def merge(mutableAggBuffer: InternalRow, inputAggBuffer: InternalRow): Unit
 }
 
 /**
@@ -504,16 +504,16 @@ abstract class TypedImperativeAggregate[T] extends ImperativeAggregate {
   /** De-serializes the serialized format Array[Byte], and produces aggregation buffer object T */
   def deserialize(storageFormat: Array[Byte]): T
 
-  final override def initialize(buffer: MutableRow): Unit = {
+  final override def initialize(buffer: InternalRow): Unit = {
     val bufferObject = createAggregationBuffer()
     buffer.update(mutableAggBufferOffset, bufferObject)
   }
 
-  final override def update(buffer: MutableRow, input: InternalRow): Unit = {
+  final override def update(buffer: InternalRow, input: InternalRow): Unit = {
     update(getBufferObject(buffer), input)
   }
 
-  final override def merge(buffer: MutableRow, inputBuffer: InternalRow): Unit = {
+  final override def merge(buffer: InternalRow, inputBuffer: InternalRow): Unit = {
     val bufferObject = getBufferObject(buffer)
     // The inputBuffer stores serialized aggregation buffer object produced by partial aggregate
     val inputObject = deserialize(inputBuffer.getBinary(inputAggBufferOffset))
@@ -547,7 +547,7 @@ abstract class TypedImperativeAggregate[T] extends ImperativeAggregate {
    * This is only called when doing Partial or PartialMerge mode aggregation, before the framework
    * shuffle out aggregate buffers.
    */
-  final def serializeAggregateBufferInPlace(buffer: MutableRow): Unit = {
+  final def serializeAggregateBufferInPlace(buffer: InternalRow): Unit = {
     buffer(mutableAggBufferOffset) = serialize(getBufferObject(buffer))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 574943d..6cab50a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -819,7 +819,7 @@ class CodeAndComment(val body: String, val comment: collection.Map[String, Strin
  */
 abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Logging {
 
-  protected val genericMutableRowType: String = classOf[GenericMutableRow].getName
+  protected val genericMutableRowType: String = classOf[GenericInternalRow].getName
 
   /**
    * Generates a class for a given input expression.  Called when there is not cached code
@@ -889,7 +889,6 @@ object CodeGenerator extends Logging {
       classOf[UnsafeArrayData].getName,
       classOf[MapData].getName,
       classOf[UnsafeMapData].getName,
-      classOf[MutableRow].getName,
       classOf[Expression].getName
     ))
     evaluator.setExtendedClass(classOf[GeneratedClass])

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
index 13d61af..5c4b56b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
@@ -24,10 +24,10 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp
 abstract class BaseMutableProjection extends MutableProjection
 
 /**
- * Generates byte code that produces a [[MutableRow]] object that can update itself based on a new
+ * Generates byte code that produces a [[InternalRow]] object that can update itself based on a new
  * input [[InternalRow]] for a fixed set of [[Expression Expressions]].
  * It exposes a `target` method, which is used to set the row that will be updated.
- * The internal [[MutableRow]] object created internally is used only when `target` is not used.
+ * The internal [[InternalRow]] object created internally is used only when `target` is not used.
  */
 object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableProjection] {
 
@@ -102,7 +102,7 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP
       class SpecificMutableProjection extends ${classOf[BaseMutableProjection].getName} {
 
         private Object[] references;
-        private MutableRow mutableRow;
+        private InternalRow mutableRow;
         ${ctx.declareMutableStates()}
 
         public SpecificMutableProjection(Object[] references) {
@@ -113,7 +113,7 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP
 
         ${ctx.declareAddedFunctions()}
 
-        public ${classOf[BaseMutableProjection].getName} target(MutableRow row) {
+        public ${classOf[BaseMutableProjection].getName} target(InternalRow row) {
           mutableRow = row;
           return this;
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
index 1c98c9e..2773e1a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.types._
 abstract class BaseProjection extends Projection {}
 
 /**
- * Generates byte code that produces a [[MutableRow]] object (not an [[UnsafeRow]]) that can update
+ * Generates byte code that produces a [[InternalRow]] object (not an [[UnsafeRow]]) that can update
  * itself based on a new input [[InternalRow]] for a fixed set of [[Expression Expressions]].
  */
 object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection] {
@@ -164,12 +164,12 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
       class SpecificSafeProjection extends ${classOf[BaseProjection].getName} {
 
         private Object[] references;
-        private MutableRow mutableRow;
+        private InternalRow mutableRow;
         ${ctx.declareMutableStates()}
 
         public SpecificSafeProjection(Object[] references) {
           this.references = references;
-          mutableRow = (MutableRow) references[references.length - 1];
+          mutableRow = (InternalRow) references[references.length - 1];
           ${ctx.initMutableStates()}
         }
 
@@ -188,7 +188,7 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
     logDebug(s"code for ${expressions.mkString(",")}:\n${CodeFormatter.format(code)}")
 
     val c = CodeGenerator.compile(code)
-    val resultRow = new SpecificMutableRow(expressions.map(_.dataType))
+    val resultRow = new SpecificInternalRow(expressions.map(_.dataType))
     c.generate(ctx.references.toArray :+ resultRow).asInstanceOf[Projection]
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
index a6125c6..1510a47 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
@@ -81,7 +81,7 @@ package object expressions  {
     def currentValue: InternalRow
 
     /** Uses the given row to store the output of the projection. */
-    def target(row: MutableRow): MutableProjection
+    def target(row: InternalRow): MutableProjection
   }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
index 73dceb3..751b821 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
@@ -158,33 +158,6 @@ trait BaseGenericInternalRow extends InternalRow {
 }
 
 /**
- * An extended interface to [[InternalRow]] that allows the values for each column to be updated.
- * Setting a value through a primitive function implicitly marks that column as not null.
- */
-abstract class MutableRow extends InternalRow {
-  def setNullAt(i: Int): Unit
-
-  def update(i: Int, value: Any): Unit
-
-  // default implementation (slow)
-  def setBoolean(i: Int, value: Boolean): Unit = { update(i, value) }
-  def setByte(i: Int, value: Byte): Unit = { update(i, value) }
-  def setShort(i: Int, value: Short): Unit = { update(i, value) }
-  def setInt(i: Int, value: Int): Unit = { update(i, value) }
-  def setLong(i: Int, value: Long): Unit = { update(i, value) }
-  def setFloat(i: Int, value: Float): Unit = { update(i, value) }
-  def setDouble(i: Int, value: Double): Unit = { update(i, value) }
-
-  /**
-   * Update the decimal column at `i`.
-   *
-   * Note: In order to support update decimal with precision > 18 in UnsafeRow,
-   * CAN NOT call setNullAt() for decimal column on UnsafeRow, call setDecimal(i, null, precision).
-   */
-  def setDecimal(i: Int, value: Decimal, precision: Int) { update(i, value) }
-}
-
-/**
  * A row implementation that uses an array of objects as the underlying storage.  Note that, while
  * the array is not copied, and thus could technically be mutated after creation, this is not
  * allowed.
@@ -230,24 +203,9 @@ class GenericInternalRow(val values: Array[Any]) extends BaseGenericInternalRow
 
   override def numFields: Int = values.length
 
-  override def copy(): GenericInternalRow = this
-}
-
-class GenericMutableRow(values: Array[Any]) extends MutableRow with BaseGenericInternalRow {
-  /** No-arg constructor for serialization. */
-  protected def this() = this(null)
-
-  def this(size: Int) = this(new Array[Any](size))
-
-  override protected def genericGet(ordinal: Int) = values(ordinal)
-
-  override def toSeq(fieldTypes: Seq[DataType]): Seq[Any] = values
-
-  override def numFields: Int = values.length
-
   override def setNullAt(i: Int): Unit = { values(i) = null}
 
   override def update(i: Int, value: Any): Unit = { values(i) = value }
 
-  override def copy(): InternalRow = new GenericInternalRow(values.clone())
+  override def copy(): GenericInternalRow = this
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index f80e637..e476cb1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -105,7 +105,7 @@ class JacksonParser(
       }
       emptyRow
     } else {
-      val row = new GenericMutableRow(schema.length)
+      val row = new GenericInternalRow(schema.length)
       for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecord)) {
         require(schema(corruptIndex).dataType == StringType)
         row.update(corruptIndex, UTF8String.fromString(record))
@@ -363,7 +363,7 @@ class JacksonParser(
       parser: JsonParser,
       schema: StructType,
       fieldConverters: Seq[ValueConverter]): InternalRow = {
-    val row = new GenericMutableRow(schema.length)
+    val row = new GenericInternalRow(schema.length)
     while (nextUntil(parser, JsonToken.END_OBJECT)) {
       schema.getFieldIndex(parser.getCurrentName) match {
         case Some(index) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
index 85563dd..43b6afd 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
@@ -23,7 +23,7 @@ import java.sql.{Date, Timestamp}
 import scala.reflect.runtime.universe.typeOf
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.expressions.{BoundReference, Literal, SpecificMutableRow}
+import org.apache.spark.sql.catalyst.expressions.{BoundReference, Literal, SpecificInternalRow}
 import org.apache.spark.sql.catalyst.expressions.objects.NewInstance
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
@@ -94,7 +94,7 @@ object TestingUDT {
       .add("c", DoubleType, nullable = false)
 
     override def serialize(n: NestedStruct): Any = {
-      val row = new SpecificMutableRow(sqlType.asInstanceOf[StructType].map(_.dataType))
+      val row = new SpecificInternalRow(sqlType.asInstanceOf[StructType].map(_.dataType))
       row.setInt(0, n.a)
       row.setLong(1, n.b)
       row.setDouble(2, n.c)

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
index 5588b44..0cb201e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
@@ -68,7 +68,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
     val length = 5000
     val expressions = List.fill(length)(EqualTo(Literal(1), Literal(1)))
     val plan = GenerateMutableProjection.generate(expressions)
-    val actual = plan(new GenericMutableRow(length)).toSeq(expressions.map(_.dataType))
+    val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType))
     val expected = Seq.fill(length)(true)
 
     if (!checkResult(actual, expected)) {
@@ -91,7 +91,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
     val expression = CaseWhen((1 to cases).map(generateCase(_)))
 
     val plan = GenerateMutableProjection.generate(Seq(expression))
-    val input = new GenericMutableRow(Array[Any](UTF8String.fromString(s"${clauses}:${cases}")))
+    val input = new GenericInternalRow(Array[Any](UTF8String.fromString(s"${clauses}:${cases}")))
     val actual = plan(input).toSeq(Seq(expression.dataType))
 
     assert(actual(0) == cases)
@@ -101,7 +101,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
     val length = 5000
     val expressions = Seq(CreateArray(List.fill(length)(EqualTo(Literal(1), Literal(1)))))
     val plan = GenerateMutableProjection.generate(expressions)
-    val actual = plan(new GenericMutableRow(length)).toSeq(expressions.map(_.dataType))
+    val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType))
     val expected = Seq(new GenericArrayData(Seq.fill(length)(true)))
 
     if (!checkResult(actual, expected)) {
@@ -116,7 +116,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
         case (expr, i) => Seq(Literal(i), expr)
       }))
     val plan = GenerateMutableProjection.generate(expressions)
-    val actual = plan(new GenericMutableRow(length)).toSeq(expressions.map(_.dataType)).map {
+    val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType)).map {
       case m: ArrayBasedMapData => ArrayBasedMapData.toScalaMap(m)
     }
     val expected = (0 until length).map((_, true)).toMap :: Nil
@@ -130,7 +130,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
     val length = 5000
     val expressions = Seq(CreateStruct(List.fill(length)(EqualTo(Literal(1), Literal(1)))))
     val plan = GenerateMutableProjection.generate(expressions)
-    val actual = plan(new GenericMutableRow(length)).toSeq(expressions.map(_.dataType))
+    val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType))
     val expected = Seq(InternalRow(Seq.fill(length)(true): _*))
 
     if (!checkResult(actual, expected)) {
@@ -145,7 +145,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
         expr => Seq(Literal(expr.toString), expr)
       }))
     val plan = GenerateMutableProjection.generate(expressions)
-    val actual = plan(new GenericMutableRow(length)).toSeq(expressions.map(_.dataType))
+    val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType))
     val expected = Seq(InternalRow(Seq.fill(length)(true): _*))
 
     if (!checkResult(actual, expected)) {
@@ -158,7 +158,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
     val schema = StructType(Seq.fill(length)(StructField("int", IntegerType)))
     val expressions = Seq(CreateExternalRow(Seq.fill(length)(Literal(1)), schema))
     val plan = GenerateMutableProjection.generate(expressions)
-    val actual = plan(new GenericMutableRow(length)).toSeq(expressions.map(_.dataType))
+    val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType))
     val expected = Seq(Row.fromSeq(Seq.fill(length)(1)))
 
     if (!checkResult(actual, expected)) {
@@ -174,7 +174,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
         Literal.create("PST", StringType))
     }
     val plan = GenerateMutableProjection.generate(expressions)
-    val actual = plan(new GenericMutableRow(length)).toSeq(expressions.map(_.dataType))
+    val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType))
     val expected = Seq.fill(length)(
       DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2015-07-24 07:00:00")))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MapDataSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MapDataSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MapDataSuite.scala
index 0f1264c..25a675a 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MapDataSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MapDataSuite.scala
@@ -45,7 +45,7 @@ class MapDataSuite extends SparkFunSuite {
 
     // UnsafeMapData
     val unsafeConverter = UnsafeProjection.create(Array[DataType](MapType(StringType, IntegerType)))
-    val row = new GenericMutableRow(1)
+    val row = new GenericInternalRow(1)
     def toUnsafeMap(map: ArrayBasedMapData): UnsafeMapData = {
       row.update(0, map)
       val unsafeRow = unsafeConverter.apply(row)

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala
index 90790dd..cf3cbe2 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala
@@ -37,7 +37,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
     val fieldTypes: Array[DataType] = Array(LongType, LongType, IntegerType)
     val converter = UnsafeProjection.create(fieldTypes)
 
-    val row = new SpecificMutableRow(fieldTypes)
+    val row = new SpecificInternalRow(fieldTypes)
     row.setLong(0, 0)
     row.setLong(1, 1)
     row.setInt(2, 2)
@@ -75,7 +75,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
     val fieldTypes: Array[DataType] = Array(LongType, StringType, BinaryType)
     val converter = UnsafeProjection.create(fieldTypes)
 
-    val row = new SpecificMutableRow(fieldTypes)
+    val row = new SpecificInternalRow(fieldTypes)
     row.setLong(0, 0)
     row.update(1, UTF8String.fromString("Hello"))
     row.update(2, "World".getBytes(StandardCharsets.UTF_8))
@@ -94,7 +94,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
     val fieldTypes: Array[DataType] = Array(LongType, StringType, DateType, TimestampType)
     val converter = UnsafeProjection.create(fieldTypes)
 
-    val row = new SpecificMutableRow(fieldTypes)
+    val row = new SpecificInternalRow(fieldTypes)
     row.setLong(0, 0)
     row.update(1, UTF8String.fromString("Hello"))
     row.update(2, DateTimeUtils.fromJavaDate(Date.valueOf("1970-01-01")))
@@ -138,7 +138,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
     val converter = UnsafeProjection.create(fieldTypes)
 
     val rowWithAllNullColumns: InternalRow = {
-      val r = new SpecificMutableRow(fieldTypes)
+      val r = new SpecificInternalRow(fieldTypes)
       for (i <- fieldTypes.indices) {
         r.setNullAt(i)
       }
@@ -167,7 +167,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
     // columns, then the serialized row representation should be identical to what we would get by
     // creating an entirely null row via the converter
     val rowWithNoNullColumns: InternalRow = {
-      val r = new SpecificMutableRow(fieldTypes)
+      val r = new SpecificInternalRow(fieldTypes)
       r.setNullAt(0)
       r.setBoolean(1, false)
       r.setByte(2, 20)
@@ -243,11 +243,11 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
   test("NaN canonicalization") {
     val fieldTypes: Array[DataType] = Array(FloatType, DoubleType)
 
-    val row1 = new SpecificMutableRow(fieldTypes)
+    val row1 = new SpecificInternalRow(fieldTypes)
     row1.setFloat(0, java.lang.Float.intBitsToFloat(0x7f800001))
     row1.setDouble(1, java.lang.Double.longBitsToDouble(0x7ff0000000000001L))
 
-    val row2 = new SpecificMutableRow(fieldTypes)
+    val row2 = new SpecificInternalRow(fieldTypes)
     row2.setFloat(0, java.lang.Float.intBitsToFloat(0x7fffffff))
     row2.setDouble(1, java.lang.Double.longBitsToDouble(0x7fffffffffffffffL))
 
@@ -263,7 +263,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
 
     val converter = UnsafeProjection.create(fieldTypes)
 
-    val row = new GenericMutableRow(fieldTypes.length)
+    val row = new GenericInternalRow(fieldTypes.length)
     row.update(0, InternalRow(1))
     row.update(1, InternalRow(InternalRow(2L)))
 
@@ -324,7 +324,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
     )
     val converter = UnsafeProjection.create(fieldTypes)
 
-    val row = new GenericMutableRow(fieldTypes.length)
+    val row = new GenericInternalRow(fieldTypes.length)
     row.update(0, createArray(1, 2))
     row.update(1, createArray(createArray(3, 4)))
 
@@ -359,7 +359,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
     val innerMap = createMap(5, 6)(7, 8)
     val map2 = createMap(9)(innerMap)
 
-    val row = new GenericMutableRow(fieldTypes.length)
+    val row = new GenericInternalRow(fieldTypes.length)
     row.update(0, map1)
     row.update(1, map2)
 
@@ -400,7 +400,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
     )
     val converter = UnsafeProjection.create(fieldTypes)
 
-    val row = new GenericMutableRow(fieldTypes.length)
+    val row = new GenericInternalRow(fieldTypes.length)
     row.update(0, InternalRow(createArray(1)))
     row.update(1, createArray(InternalRow(2L)))
 
@@ -439,7 +439,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
     )
     val converter = UnsafeProjection.create(fieldTypes)
 
-    val row = new GenericMutableRow(fieldTypes.length)
+    val row = new GenericInternalRow(fieldTypes.length)
     row.update(0, InternalRow(createMap(1)(2)))
     row.update(1, createMap(3)(InternalRow(4L)))
 
@@ -485,7 +485,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
     )
     val converter = UnsafeProjection.create(fieldTypes)
 
-    val row = new GenericMutableRow(fieldTypes.length)
+    val row = new GenericInternalRow(fieldTypes.length)
     row.update(0, createArray(createMap(1)(2)))
     row.update(1, createMap(3)(createArray(4)))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentileSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentileSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentileSuite.scala
index 61298a1..8456e24 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentileSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentileSuite.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.analysis.{SimpleAnalyzer, UnresolvedAttribu
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, BoundReference, Cast, CreateArray, DecimalLiteral, GenericMutableRow, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, BoundReference, Cast, CreateArray, DecimalLiteral, GenericInternalRow, Literal}
 import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile.{PercentileDigest, PercentileDigestSerializer}
 import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
 import org.apache.spark.sql.catalyst.util.ArrayData
@@ -144,7 +144,8 @@ class ApproximatePercentileSuite extends SparkFunSuite {
       .withNewInputAggBufferOffset(inputAggregationBufferOffset)
       .withNewMutableAggBufferOffset(mutableAggregationBufferOffset)
 
-    val mutableAggBuffer = new GenericMutableRow(new Array[Any](mutableAggregationBufferOffset + 1))
+    val mutableAggBuffer = new GenericInternalRow(
+      new Array[Any](mutableAggregationBufferOffset + 1))
     agg.initialize(mutableAggBuffer)
     val dataCount = 10
     (1 to dataCount).foreach { data =>
@@ -154,7 +155,7 @@ class ApproximatePercentileSuite extends SparkFunSuite {
 
     // Serialize the aggregation buffer
     val serialized = mutableAggBuffer.getBinary(mutableAggregationBufferOffset)
-    val inputAggBuffer = new GenericMutableRow(Array[Any](null, serialized))
+    val inputAggBuffer = new GenericInternalRow(Array[Any](null, serialized))
 
     // Phase 2: final mode aggregation
     // Re-initialize the aggregation buffer
@@ -311,7 +312,7 @@ class ApproximatePercentileSuite extends SparkFunSuite {
   test("class ApproximatePercentile, null handling") {
     val childExpression = Cast(BoundReference(0, IntegerType, nullable = true), DoubleType)
     val agg = new ApproximatePercentile(childExpression, Literal(0.5D))
-    val buffer = new GenericMutableRow(new Array[Any](1))
+    val buffer = new GenericInternalRow(new Array[Any](1))
     agg.initialize(buffer)
     // Empty aggregation buffer
     assert(agg.eval(buffer) == null)

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlusSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlusSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlusSuite.scala
index f537422..17f6b71 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlusSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlusSuite.scala
@@ -22,28 +22,29 @@ import java.util.Random
 import scala.collection.mutable
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.expressions.{BoundReference, MutableRow, SpecificMutableRow}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{BoundReference, SpecificInternalRow}
 import org.apache.spark.sql.types.{DataType, IntegerType}
 
 class HyperLogLogPlusPlusSuite extends SparkFunSuite {
 
   /** Create a HLL++ instance and an input and output buffer. */
   def createEstimator(rsd: Double, dt: DataType = IntegerType):
-      (HyperLogLogPlusPlus, MutableRow, MutableRow) = {
-    val input = new SpecificMutableRow(Seq(dt))
+      (HyperLogLogPlusPlus, InternalRow, InternalRow) = {
+    val input = new SpecificInternalRow(Seq(dt))
     val hll = new HyperLogLogPlusPlus(new BoundReference(0, dt, true), rsd)
     val buffer = createBuffer(hll)
     (hll, input, buffer)
   }
 
-  def createBuffer(hll: HyperLogLogPlusPlus): MutableRow = {
-    val buffer = new SpecificMutableRow(hll.aggBufferAttributes.map(_.dataType))
+  def createBuffer(hll: HyperLogLogPlusPlus): InternalRow = {
+    val buffer = new SpecificInternalRow(hll.aggBufferAttributes.map(_.dataType))
     hll.initialize(buffer)
     buffer
   }
 
   /** Evaluate the estimate. It should be within 3*SD's of the given true rsd. */
-  def evaluateEstimate(hll: HyperLogLogPlusPlus, buffer: MutableRow, cardinality: Int): Unit = {
+  def evaluateEstimate(hll: HyperLogLogPlusPlus, buffer: InternalRow, cardinality: Int): Unit = {
     val estimate = hll.eval(buffer).asInstanceOf[Long].toDouble
     val error = math.abs((estimate / cardinality.toDouble) - 1.0d)
     assert(error < hll.trueRsd * 3.0d, "Error should be within 3 std. errors.")

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
index 62abc2a..a6ce4c2 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
@@ -21,8 +21,7 @@ import java.util.*;
 
 import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.expressions.GenericMutableRow;
-import org.apache.spark.sql.catalyst.expressions.MutableRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
 import org.apache.spark.sql.catalyst.util.ArrayData;
 import org.apache.spark.sql.catalyst.util.MapData;
@@ -91,7 +90,7 @@ public final class ColumnarBatch {
    * Adapter class to interop with existing components that expect internal row. A lot of
    * performance is lost with this translation.
    */
-  public static final class Row extends MutableRow {
+  public static final class Row extends InternalRow {
     protected int rowId;
     private final ColumnarBatch parent;
     private final int fixedLenRowSize;
@@ -129,7 +128,7 @@ public final class ColumnarBatch {
      * Revisit this. This is expensive. This is currently only used in test paths.
      */
     public InternalRow copy() {
-      GenericMutableRow row = new GenericMutableRow(columns.length);
+      GenericInternalRow row = new GenericInternalRow(columns.length);
       for (int i = 0; i < numFields(); i++) {
         if (isNullAt(i)) {
           row.setNullAt(i);

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 6c4248c..d3a2222 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -32,7 +32,7 @@ object RDDConversions {
   def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): RDD[InternalRow] = {
     data.mapPartitions { iterator =>
       val numColumns = outputTypes.length
-      val mutableRow = new GenericMutableRow(numColumns)
+      val mutableRow = new GenericInternalRow(numColumns)
       val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter)
       iterator.map { r =>
         var i = 0
@@ -52,7 +52,7 @@ object RDDConversions {
   def rowToRowRdd(data: RDD[Row], outputTypes: Seq[DataType]): RDD[InternalRow] = {
     data.mapPartitions { iterator =>
       val numColumns = outputTypes.length
-      val mutableRow = new GenericMutableRow(numColumns)
+      val mutableRow = new GenericInternalRow(numColumns)
       val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter)
       iterator.map { r =>
         var i = 0

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala
index f335912..7c11fdb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala
@@ -153,7 +153,7 @@ abstract class AggregationIterator(
   protected def generateProcessRow(
       expressions: Seq[AggregateExpression],
       functions: Seq[AggregateFunction],
-      inputAttributes: Seq[Attribute]): (MutableRow, InternalRow) => Unit = {
+      inputAttributes: Seq[Attribute]): (InternalRow, InternalRow) => Unit = {
     val joinedRow = new JoinedRow
     if (expressions.nonEmpty) {
       val mergeExpressions = functions.zipWithIndex.flatMap {
@@ -168,9 +168,9 @@ abstract class AggregationIterator(
         case (ae: ImperativeAggregate, i) =>
           expressions(i).mode match {
             case Partial | Complete =>
-              (buffer: MutableRow, row: InternalRow) => ae.update(buffer, row)
+              (buffer: InternalRow, row: InternalRow) => ae.update(buffer, row)
             case PartialMerge | Final =>
-              (buffer: MutableRow, row: InternalRow) => ae.merge(buffer, row)
+              (buffer: InternalRow, row: InternalRow) => ae.merge(buffer, row)
           }
       }.toArray
       // This projection is used to merge buffer values for all expression-based aggregates.
@@ -178,7 +178,7 @@ abstract class AggregationIterator(
       val updateProjection =
         newMutableProjection(mergeExpressions, aggregationBufferSchema ++ inputAttributes)
 
-      (currentBuffer: MutableRow, row: InternalRow) => {
+      (currentBuffer: InternalRow, row: InternalRow) => {
         // Process all expression-based aggregate functions.
         updateProjection.target(currentBuffer)(joinedRow(currentBuffer, row))
         // Process all imperative aggregate functions.
@@ -190,11 +190,11 @@ abstract class AggregationIterator(
       }
     } else {
       // Grouping only.
-      (currentBuffer: MutableRow, row: InternalRow) => {}
+      (currentBuffer: InternalRow, row: InternalRow) => {}
     }
   }
 
-  protected val processRow: (MutableRow, InternalRow) => Unit =
+  protected val processRow: (InternalRow, InternalRow) => Unit =
     generateProcessRow(aggregateExpressions, aggregateFunctions, inputAttributes)
 
   protected val groupingProjection: UnsafeProjection =
@@ -202,7 +202,7 @@ abstract class AggregationIterator(
   protected val groupingAttributes = groupingExpressions.map(_.toAttribute)
 
   // Initializing the function used to generate the output row.
-  protected def generateResultProjection(): (UnsafeRow, MutableRow) => UnsafeRow = {
+  protected def generateResultProjection(): (UnsafeRow, InternalRow) => UnsafeRow = {
     val joinedRow = new JoinedRow
     val modes = aggregateExpressions.map(_.mode).distinct
     val bufferAttributes = aggregateFunctions.flatMap(_.aggBufferAttributes)
@@ -211,14 +211,14 @@ abstract class AggregationIterator(
         case ae: DeclarativeAggregate => ae.evaluateExpression
         case agg: AggregateFunction => NoOp
       }
-      val aggregateResult = new SpecificMutableRow(aggregateAttributes.map(_.dataType))
+      val aggregateResult = new SpecificInternalRow(aggregateAttributes.map(_.dataType))
       val expressionAggEvalProjection = newMutableProjection(evalExpressions, bufferAttributes)
       expressionAggEvalProjection.target(aggregateResult)
 
       val resultProjection =
         UnsafeProjection.create(resultExpressions, groupingAttributes ++ aggregateAttributes)
 
-      (currentGroupingKey: UnsafeRow, currentBuffer: MutableRow) => {
+      (currentGroupingKey: UnsafeRow, currentBuffer: InternalRow) => {
         // Generate results for all expression-based aggregate functions.
         expressionAggEvalProjection(currentBuffer)
         // Generate results for all imperative aggregate functions.
@@ -244,7 +244,7 @@ abstract class AggregationIterator(
         }
       }
 
-      (currentGroupingKey: UnsafeRow, currentBuffer: MutableRow) => {
+      (currentGroupingKey: UnsafeRow, currentBuffer: InternalRow) => {
         // Serializes the generic object stored in aggregation buffer
         var i = 0
         while (i < typedImperativeAggregates.length) {
@@ -256,17 +256,17 @@ abstract class AggregationIterator(
     } else {
       // Grouping-only: we only output values based on grouping expressions.
       val resultProjection = UnsafeProjection.create(resultExpressions, groupingAttributes)
-      (currentGroupingKey: UnsafeRow, currentBuffer: MutableRow) => {
+      (currentGroupingKey: UnsafeRow, currentBuffer: InternalRow) => {
         resultProjection(currentGroupingKey)
       }
     }
   }
 
-  protected val generateOutput: (UnsafeRow, MutableRow) => UnsafeRow =
+  protected val generateOutput: (UnsafeRow, InternalRow) => UnsafeRow =
     generateResultProjection()
 
   /** Initializes buffer values for all aggregate functions. */
-  protected def initializeBuffer(buffer: MutableRow): Unit = {
+  protected def initializeBuffer(buffer: InternalRow): Unit = {
     expressionAggInitialProjection.target(buffer)(EmptyRow)
     var i = 0
     while (i < allImperativeAggregateFunctions.length) {

http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
index c2b1ef0..bea2dce 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
@@ -49,11 +49,11 @@ class SortBasedAggregationIterator(
    * Creates a new aggregation buffer and initializes buffer values
    * for all aggregate functions.
    */
-  private def newBuffer: MutableRow = {
+  private def newBuffer: InternalRow = {
     val bufferSchema = aggregateFunctions.flatMap(_.aggBufferAttributes)
     val bufferRowSize: Int = bufferSchema.length
 
-    val genericMutableBuffer = new GenericMutableRow(bufferRowSize)
+    val genericMutableBuffer = new GenericInternalRow(bufferRowSize)
     val useUnsafeBuffer = bufferSchema.map(_.dataType).forall(UnsafeRow.isMutable)
 
     val buffer = if (useUnsafeBuffer) {
@@ -84,7 +84,7 @@ class SortBasedAggregationIterator(
   private[this] var sortedInputHasNewGroup: Boolean = false
 
   // The aggregation buffer used by the sort-based aggregation.
-  private[this] val sortBasedAggregationBuffer: MutableRow = newBuffer
+  private[this] val sortBasedAggregationBuffer: InternalRow = newBuffer
 
   // This safe projection is used to turn the input row into safe row. This is necessary
   // because the input row may be produced by unsafe projection in child operator and all the


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org