You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/06/13 08:06:40 UTC

[3/5] spark git commit: [SPARK-7186] [SQL] Decouple internal Row from external Row

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index b44d4c8..1828ed1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -245,7 +245,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) {
         JsonRDD.nullTypeToStringType(
           JsonRDD.inferSchema(jsonRDD, 1.0, columnNameOfCorruptJsonRecord)))
       val rowRDD = JsonRDD.jsonStringToRow(jsonRDD, appliedSchema, columnNameOfCorruptJsonRecord)
-      sqlContext.createDataFrame(rowRDD, appliedSchema, needsConversion = false)
+      sqlContext.internalCreateDataFrame(rowRDD, appliedSchema)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 5f758ad..22d0e50 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -31,7 +31,7 @@ import org.apache.spark.SparkContext
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst._
+import org.apache.spark.sql.catalyst.{InternalRow, _}
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.errors.DialectException
@@ -486,15 +486,27 @@ class SQLContext(@transient val sparkContext: SparkContext)
     // schema differs from the existing schema on any field data type.
     val catalystRows = if (needsConversion) {
       val converter = CatalystTypeConverters.createToCatalystConverter(schema)
-      rowRDD.map(converter(_).asInstanceOf[Row])
+      rowRDD.map(converter(_).asInstanceOf[InternalRow])
     } else {
-      rowRDD
+      rowRDD.map{r: Row => InternalRow.fromSeq(r.toSeq)}
     }
     val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self)
     DataFrame(this, logicalPlan)
   }
 
   /**
+   * Creates a DataFrame from an RDD[Row]. User can specify whether the input rows should be
+   * converted to Catalyst rows.
+   */
+  private[sql]
+  def internalCreateDataFrame(catalystRows: RDD[InternalRow], schema: StructType) = {
+    // TODO: use MutableProjection when rowRDD is another DataFrame and the applied
+    // schema differs from the existing schema on any field data type.
+    val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self)
+    DataFrame(this, logicalPlan)
+  }
+
+  /**
    * :: DeveloperApi ::
    * Creates a [[DataFrame]] from an [[JavaRDD]] containing [[Row]]s using the given schema.
    * It is important to make sure that the structure of every [[Row]] of the provided RDD matches
@@ -531,7 +543,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
           extractors.zip(attributeSeq).map { case (e, attr) =>
             CatalystTypeConverters.convertToCatalyst(e.invoke(row), attr.dataType)
           }.toArray[Any]
-        ) : Row
+        ) : InternalRow
       }
     }
     DataFrame(this, LogicalRDD(attributeSeq, rowRdd)(this))
@@ -886,7 +898,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
   protected[sql] val planner = new SparkPlanner
 
   @transient
-  protected[sql] lazy val emptyResult = sparkContext.parallelize(Seq.empty[Row], 1)
+  protected[sql] lazy val emptyResult = sparkContext.parallelize(Seq.empty[InternalRow], 1)
 
   /**
    * Prepares a planned SparkPlan for execution by inserting shuffle operations as needed.
@@ -953,7 +965,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
     lazy val executedPlan: SparkPlan = prepareForExecution.execute(sparkPlan)
 
     /** Internal version of the RDD. Avoids copies and has no schema */
-    lazy val toRdd: RDD[Row] = executedPlan.execute()
+    lazy val toRdd: RDD[InternalRow] = executedPlan.execute()
 
     protected def stringOrError[A](f: => A): String =
       try f.toString catch { case e: Throwable => e.toString }
@@ -1035,7 +1047,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
     }
 
     val rowRdd = convertedRdd.mapPartitions { iter =>
-      iter.map { m => new GenericRow(m): Row}
+      iter.map { m => new GenericRow(m): InternalRow}
     }
 
     DataFrame(this, LogicalRDD(schema.toAttributes, rowRdd)(self))

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
index aa10af4..cc7506d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
@@ -18,8 +18,7 @@
 package org.apache.spark.sql.columnar
 
 import java.nio.{ByteBuffer, ByteOrder}
-
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.columnar.ColumnBuilder._
 import org.apache.spark.sql.columnar.compression.{AllCompressionSchemes, CompressibleColumnBuilder}
 import org.apache.spark.sql.types._
@@ -33,7 +32,7 @@ private[sql] trait ColumnBuilder {
   /**
    * Appends `row(ordinal)` to the column builder.
    */
-  def appendFrom(row: Row, ordinal: Int)
+  def appendFrom(row: InternalRow, ordinal: Int)
 
   /**
    * Column statistics information
@@ -68,7 +67,7 @@ private[sql] class BasicColumnBuilder[T <: DataType, JvmType](
     buffer.order(ByteOrder.nativeOrder()).putInt(columnType.typeId)
   }
 
-  override def appendFrom(row: Row, ordinal: Int): Unit = {
+  override def appendFrom(row: InternalRow, ordinal: Int): Unit = {
     buffer = ensureFreeSpace(buffer, columnType.actualSize(row, ordinal))
     columnType.append(row, ordinal, buffer)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
index 11c79c8..1bce214 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.columnar
 
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference}
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
@@ -53,7 +53,7 @@ private[sql] sealed trait ColumnStats extends Serializable {
   /**
    * Gathers statistics information from `row(ordinal)`.
    */
-  def gatherStats(row: Row, ordinal: Int): Unit = {
+  def gatherStats(row: InternalRow, ordinal: Int): Unit = {
     if (row.isNullAt(ordinal)) {
       nullCount += 1
       // 4 bytes for null position
@@ -66,23 +66,23 @@ private[sql] sealed trait ColumnStats extends Serializable {
    * Column statistics represented as a single row, currently including closed lower bound, closed
    * upper bound and null count.
    */
-  def collectedStatistics: Row
+  def collectedStatistics: InternalRow
 }
 
 /**
  * A no-op ColumnStats only used for testing purposes.
  */
 private[sql] class NoopColumnStats extends ColumnStats {
-  override def gatherStats(row: Row, ordinal: Int): Unit = super.gatherStats(row, ordinal)
+  override def gatherStats(row: InternalRow, ordinal: Int): Unit = super.gatherStats(row, ordinal)
 
-  override def collectedStatistics: Row = Row(null, null, nullCount, count, 0L)
+  override def collectedStatistics: InternalRow = InternalRow(null, null, nullCount, count, 0L)
 }
 
 private[sql] class BooleanColumnStats extends ColumnStats {
   protected var upper = false
   protected var lower = true
 
-  override def gatherStats(row: Row, ordinal: Int): Unit = {
+  override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
     super.gatherStats(row, ordinal)
     if (!row.isNullAt(ordinal)) {
       val value = row.getBoolean(ordinal)
@@ -92,14 +92,15 @@ private[sql] class BooleanColumnStats extends ColumnStats {
     }
   }
 
-  override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes)
+  override def collectedStatistics: InternalRow =
+    InternalRow(lower, upper, nullCount, count, sizeInBytes)
 }
 
 private[sql] class ByteColumnStats extends ColumnStats {
   protected var upper = Byte.MinValue
   protected var lower = Byte.MaxValue
 
-  override def gatherStats(row: Row, ordinal: Int): Unit = {
+  override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
     super.gatherStats(row, ordinal)
     if (!row.isNullAt(ordinal)) {
       val value = row.getByte(ordinal)
@@ -109,14 +110,15 @@ private[sql] class ByteColumnStats extends ColumnStats {
     }
   }
 
-  override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes)
+  override def collectedStatistics: InternalRow =
+    InternalRow(lower, upper, nullCount, count, sizeInBytes)
 }
 
 private[sql] class ShortColumnStats extends ColumnStats {
   protected var upper = Short.MinValue
   protected var lower = Short.MaxValue
 
-  override def gatherStats(row: Row, ordinal: Int): Unit = {
+  override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
     super.gatherStats(row, ordinal)
     if (!row.isNullAt(ordinal)) {
       val value = row.getShort(ordinal)
@@ -126,14 +128,15 @@ private[sql] class ShortColumnStats extends ColumnStats {
     }
   }
 
-  override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes)
+  override def collectedStatistics: InternalRow =
+    InternalRow(lower, upper, nullCount, count, sizeInBytes)
 }
 
 private[sql] class LongColumnStats extends ColumnStats {
   protected var upper = Long.MinValue
   protected var lower = Long.MaxValue
 
-  override def gatherStats(row: Row, ordinal: Int): Unit = {
+  override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
     super.gatherStats(row, ordinal)
     if (!row.isNullAt(ordinal)) {
       val value = row.getLong(ordinal)
@@ -143,14 +146,15 @@ private[sql] class LongColumnStats extends ColumnStats {
     }
   }
 
-  override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes)
+  override def collectedStatistics: InternalRow =
+    InternalRow(lower, upper, nullCount, count, sizeInBytes)
 }
 
 private[sql] class DoubleColumnStats extends ColumnStats {
   protected var upper = Double.MinValue
   protected var lower = Double.MaxValue
 
-  override def gatherStats(row: Row, ordinal: Int): Unit = {
+  override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
     super.gatherStats(row, ordinal)
     if (!row.isNullAt(ordinal)) {
       val value = row.getDouble(ordinal)
@@ -160,14 +164,15 @@ private[sql] class DoubleColumnStats extends ColumnStats {
     }
   }
 
-  override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes)
+  override def collectedStatistics: InternalRow =
+    InternalRow(lower, upper, nullCount, count, sizeInBytes)
 }
 
 private[sql] class FloatColumnStats extends ColumnStats {
   protected var upper = Float.MinValue
   protected var lower = Float.MaxValue
 
-  override def gatherStats(row: Row, ordinal: Int): Unit = {
+  override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
     super.gatherStats(row, ordinal)
     if (!row.isNullAt(ordinal)) {
       val value = row.getFloat(ordinal)
@@ -177,14 +182,15 @@ private[sql] class FloatColumnStats extends ColumnStats {
     }
   }
 
-  override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes)
+  override def collectedStatistics: InternalRow =
+    InternalRow(lower, upper, nullCount, count, sizeInBytes)
 }
 
 private[sql] class FixedDecimalColumnStats extends ColumnStats {
   protected var upper: Decimal = null
   protected var lower: Decimal = null
 
-  override def gatherStats(row: Row, ordinal: Int): Unit = {
+  override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
     super.gatherStats(row, ordinal)
     if (!row.isNullAt(ordinal)) {
       val value = row(ordinal).asInstanceOf[Decimal]
@@ -194,14 +200,15 @@ private[sql] class FixedDecimalColumnStats extends ColumnStats {
     }
   }
 
-  override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes)
+  override def collectedStatistics: InternalRow =
+    InternalRow(lower, upper, nullCount, count, sizeInBytes)
 }
 
 private[sql] class IntColumnStats extends ColumnStats {
   protected var upper = Int.MinValue
   protected var lower = Int.MaxValue
 
-  override def gatherStats(row: Row, ordinal: Int): Unit = {
+  override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
     super.gatherStats(row, ordinal)
     if (!row.isNullAt(ordinal)) {
       val value = row.getInt(ordinal)
@@ -211,14 +218,15 @@ private[sql] class IntColumnStats extends ColumnStats {
     }
   }
 
-  override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes)
+  override def collectedStatistics: InternalRow =
+    InternalRow(lower, upper, nullCount, count, sizeInBytes)
 }
 
 private[sql] class StringColumnStats extends ColumnStats {
   protected var upper: UTF8String = null
   protected var lower: UTF8String = null
 
-  override def gatherStats(row: Row, ordinal: Int): Unit = {
+  override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
     super.gatherStats(row, ordinal)
     if (!row.isNullAt(ordinal)) {
       val value = row(ordinal).asInstanceOf[UTF8String]
@@ -228,7 +236,8 @@ private[sql] class StringColumnStats extends ColumnStats {
     }
   }
 
-  override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes)
+  override def collectedStatistics: InternalRow =
+    InternalRow(lower, upper, nullCount, count, sizeInBytes)
 }
 
 private[sql] class DateColumnStats extends IntColumnStats
@@ -236,23 +245,25 @@ private[sql] class DateColumnStats extends IntColumnStats
 private[sql] class TimestampColumnStats extends LongColumnStats
 
 private[sql] class BinaryColumnStats extends ColumnStats {
-  override def gatherStats(row: Row, ordinal: Int): Unit = {
+  override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
     super.gatherStats(row, ordinal)
     if (!row.isNullAt(ordinal)) {
       sizeInBytes += BINARY.actualSize(row, ordinal)
     }
   }
 
-  override def collectedStatistics: Row = Row(null, null, nullCount, count, sizeInBytes)
+  override def collectedStatistics: InternalRow =
+    InternalRow(null, null, nullCount, count, sizeInBytes)
 }
 
 private[sql] class GenericColumnStats extends ColumnStats {
-  override def gatherStats(row: Row, ordinal: Int): Unit = {
+  override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
     super.gatherStats(row, ordinal)
     if (!row.isNullAt(ordinal)) {
       sizeInBytes += GENERIC.actualSize(row, ordinal)
     }
   }
 
-  override def collectedStatistics: Row = Row(null, null, nullCount, count, sizeInBytes)
+  override def collectedStatistics: InternalRow =
+    InternalRow(null, null, nullCount, count, sizeInBytes)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index 3db26fa..761f427 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -19,21 +19,16 @@ package org.apache.spark.sql.columnar
 
 import java.nio.ByteBuffer
 
-import org.apache.spark.{Accumulable, Accumulator, Accumulators}
-import org.apache.spark.sql.catalyst.expressions
-
 import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.Row
-import org.apache.spark.SparkContext
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
 import org.apache.spark.sql.execution.{LeafNode, SparkPlan}
 import org.apache.spark.storage.StorageLevel
+import org.apache.spark.{Accumulable, Accumulator, Accumulators}
 
 private[sql] object InMemoryRelation {
   def apply(
@@ -45,7 +40,7 @@ private[sql] object InMemoryRelation {
     new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child, tableName)()
 }
 
-private[sql] case class CachedBatch(buffers: Array[Array[Byte]], stats: Row)
+private[sql] case class CachedBatch(buffers: Array[Array[Byte]], stats: InternalRow)
 
 private[sql] case class InMemoryRelation(
     output: Seq[Attribute],
@@ -56,12 +51,12 @@ private[sql] case class InMemoryRelation(
     tableName: Option[String])(
     private var _cachedColumnBuffers: RDD[CachedBatch] = null,
     private var _statistics: Statistics = null,
-    private var _batchStats: Accumulable[ArrayBuffer[Row], Row] = null)
+    private var _batchStats: Accumulable[ArrayBuffer[InternalRow], InternalRow] = null)
   extends LogicalPlan with MultiInstanceRelation {
 
-  private val batchStats: Accumulable[ArrayBuffer[Row], Row] =
+  private val batchStats: Accumulable[ArrayBuffer[InternalRow], InternalRow] =
     if (_batchStats == null) {
-      child.sqlContext.sparkContext.accumulableCollection(ArrayBuffer.empty[Row])
+      child.sqlContext.sparkContext.accumulableCollection(ArrayBuffer.empty[InternalRow])
     } else {
       _batchStats
     }
@@ -151,7 +146,7 @@ private[sql] case class InMemoryRelation(
             rowCount += 1
           }
 
-          val stats = Row.merge(columnBuilders.map(_.columnStats.collectedStatistics) : _*)
+          val stats = InternalRow.merge(columnBuilders.map(_.columnStats.collectedStatistics) : _*)
 
           batchStats += stats
           CachedBatch(columnBuilders.map(_.build().array()), stats)
@@ -267,7 +262,7 @@ private[sql] case class InMemoryColumnarTableScan(
 
   private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning
 
-  protected override def doExecute(): RDD[Row] = {
+  protected override def doExecute(): RDD[InternalRow] = {
     if (enableAccumulators) {
       readPartitions.setValue(0)
       readBatches.setValue(0)
@@ -296,7 +291,7 @@ private[sql] case class InMemoryColumnarTableScan(
 
       val nextRow = new SpecificMutableRow(requestedColumnDataTypes)
 
-      def cachedBatchesToRows(cacheBatches: Iterator[CachedBatch]): Iterator[Row] = {
+      def cachedBatchesToRows(cacheBatches: Iterator[CachedBatch]): Iterator[InternalRow] = {
         val rows = cacheBatches.flatMap { cachedBatch =>
           // Build column accessors
           val columnAccessors = requestedColumnIndices.map { batchColumnIndex =>
@@ -306,15 +301,15 @@ private[sql] case class InMemoryColumnarTableScan(
           }
 
           // Extract rows via column accessors
-          new Iterator[Row] {
+          new Iterator[InternalRow] {
             private[this] val rowLen = nextRow.length
-            override def next(): Row = {
+            override def next(): InternalRow = {
               var i = 0
               while (i < rowLen) {
                 columnAccessors(i).extractTo(nextRow, i)
                 i += 1
               }
-              if (attributes.isEmpty) Row.empty else nextRow
+              if (attributes.isEmpty) InternalRow.empty else nextRow
             }
 
             override def hasNext: Boolean = columnAccessors(0).hasNext

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala
index f1f494a..ba47bc7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.columnar
 
 import java.nio.{ByteBuffer, ByteOrder}
 
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
 
 /**
  * A stackable trait used for building byte buffer for a column containing null values.  Memory
@@ -52,7 +52,7 @@ private[sql] trait NullableColumnBuilder extends ColumnBuilder {
     super.initialize(initialSize, columnName, useCompression)
   }
 
-  abstract override def appendFrom(row: Row, ordinal: Int): Unit = {
+  abstract override def appendFrom(row: InternalRow, ordinal: Int): Unit = {
     columnStats.gatherStats(row, ordinal)
     if (row.isNullAt(ordinal)) {
       nulls = ColumnBuilder.ensureFreeSpace(nulls, 4)

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
index 8e2a1af..39b21dd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.columnar.compression
 import java.nio.{ByteBuffer, ByteOrder}
 
 import org.apache.spark.Logging
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.columnar.{ColumnBuilder, NativeColumnBuilder}
 import org.apache.spark.sql.types.AtomicType
 
@@ -66,7 +66,7 @@ private[sql] trait CompressibleColumnBuilder[T <: AtomicType]
     encoder.compressionRatio < 0.8
   }
 
-  private def gatherCompressibilityStats(row: Row, ordinal: Int): Unit = {
+  private def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = {
     var i = 0
     while (i < compressionEncoders.length) {
       compressionEncoders(i).gatherCompressibilityStats(row, ordinal)
@@ -74,7 +74,7 @@ private[sql] trait CompressibleColumnBuilder[T <: AtomicType]
     }
   }
 
-  abstract override def appendFrom(row: Row, ordinal: Int): Unit = {
+  abstract override def appendFrom(row: InternalRow, ordinal: Int): Unit = {
     super.appendFrom(row, ordinal)
     if (!row.isNullAt(ordinal)) {
       gatherCompressibilityStats(row, ordinal)

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala
index 17c2d9b..4eaec6d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala
@@ -18,14 +18,13 @@
 package org.apache.spark.sql.columnar.compression
 
 import java.nio.{ByteBuffer, ByteOrder}
-
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.MutableRow
 import org.apache.spark.sql.columnar.{ColumnType, NativeColumnType}
 import org.apache.spark.sql.types.AtomicType
 
 private[sql] trait Encoder[T <: AtomicType] {
-  def gatherCompressibilityStats(row: Row, ordinal: Int): Unit = {}
+  def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = {}
 
   def compressedSize: Int
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala
index 534ae90..5abc125 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala
@@ -22,8 +22,7 @@ import java.nio.ByteBuffer
 import scala.collection.mutable
 import scala.reflect.ClassTag
 import scala.reflect.runtime.universe.runtimeMirror
-
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{MutableRow, SpecificMutableRow}
 import org.apache.spark.sql.columnar._
 import org.apache.spark.sql.types._
@@ -96,7 +95,7 @@ private[sql] case object RunLengthEncoding extends CompressionScheme {
 
     override def compressedSize: Int = _compressedSize
 
-    override def gatherCompressibilityStats(row: Row, ordinal: Int): Unit = {
+    override def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = {
       val value = columnType.getField(row, ordinal)
       val actualSize = columnType.actualSize(row, ordinal)
       _uncompressedSize += actualSize
@@ -217,7 +216,7 @@ private[sql] case object DictionaryEncoding extends CompressionScheme {
     // to store dictionary element count.
     private var dictionarySize = 4
 
-    override def gatherCompressibilityStats(row: Row, ordinal: Int): Unit = {
+    override def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = {
       val value = columnType.getField(row, ordinal)
 
       if (!overflow) {
@@ -310,7 +309,7 @@ private[sql] case object BooleanBitSet extends CompressionScheme {
   class Encoder extends compression.Encoder[BooleanType.type] {
     private var _uncompressedSize = 0
 
-    override def gatherCompressibilityStats(row: Row, ordinal: Int): Unit = {
+    override def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = {
       _uncompressedSize += BOOLEAN.defaultSize
     }
 
@@ -404,7 +403,7 @@ private[sql] case object IntDelta extends CompressionScheme {
 
     private var prevValue: Int = _
 
-    override def gatherCompressibilityStats(row: Row, ordinal: Int): Unit = {
+    override def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = {
       val value = row.getInt(ordinal)
       val delta = value - prevValue
 
@@ -484,7 +483,7 @@ private[sql] case object LongDelta extends CompressionScheme {
 
     private var prevValue: Long = _
 
-    override def gatherCompressibilityStats(row: Row, ordinal: Int): Unit = {
+    override def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = {
       val value = row.getLong(ordinal)
       val delta = value - prevValue
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
index 8d16749..6e8a5ef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
@@ -20,12 +20,10 @@ package org.apache.spark.sql.execution
 import java.util.HashMap
 
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.errors._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.SQLContext
 
 /**
  * :: DeveloperApi ::
@@ -121,11 +119,11 @@ case class Aggregate(
     }
   }
 
-  protected override def doExecute(): RDD[Row] = attachTree(this, "execute") {
+  protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
     if (groupingExpressions.isEmpty) {
       child.execute().mapPartitions { iter =>
         val buffer = newAggregateBuffer()
-        var currentRow: Row = null
+        var currentRow: InternalRow = null
         while (iter.hasNext) {
           currentRow = iter.next()
           var i = 0
@@ -147,10 +145,10 @@ case class Aggregate(
       }
     } else {
       child.execute().mapPartitions { iter =>
-        val hashTable = new HashMap[Row, Array[AggregateFunction]]
+        val hashTable = new HashMap[InternalRow, Array[AggregateFunction]]
         val groupingProjection = new InterpretedMutableProjection(groupingExpressions, child.output)
 
-        var currentRow: Row = null
+        var currentRow: InternalRow = null
         while (iter.hasNext) {
           currentRow = iter.next()
           val currentGroup = groupingProjection(currentRow)
@@ -167,7 +165,7 @@ case class Aggregate(
           }
         }
 
-        new Iterator[Row] {
+        new Iterator[InternalRow] {
           private[this] val hashTableIter = hashTable.entrySet().iterator()
           private[this] val aggregateResults = new GenericMutableRow(computedAggregates.length)
           private[this] val resultProjection =
@@ -177,7 +175,7 @@ case class Aggregate(
 
           override final def hasNext: Boolean = hashTableIter.hasNext
 
-          override final def next(): Row = {
+          override final def next(): InternalRow = {
             val currentEntry = hashTableIter.next()
             val currentGroup = currentEntry.getKey
             val currentBuffer = currentEntry.getValue

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 6fa7ccc..c9a1883 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -17,19 +17,19 @@
 
 package org.apache.spark.sql.execution
 
-import org.apache.spark.{HashPartitioner, Partitioner, RangePartitioner, SparkEnv}
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.{RDD, ShuffledRDD}
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.shuffle.sort.SortShuffleManager
 import org.apache.spark.shuffle.unsafe.UnsafeShuffleManager
+import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.errors.attachTree
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.types.DataType
-import org.apache.spark.sql.{SQLContext, Row}
 import org.apache.spark.util.MutablePair
+import org.apache.spark.{HashPartitioner, Partitioner, RangePartitioner, SparkEnv}
 
 /**
  * :: DeveloperApi ::
@@ -157,7 +157,7 @@ case class Exchange(
     serializer
   }
 
-  protected override def doExecute(): RDD[Row] = attachTree(this , "execute") {
+  protected override def doExecute(): RDD[InternalRow] = attachTree(this , "execute") {
     newPartitioning match {
       case HashPartitioning(expressions, numPartitions) =>
         val keySchema = expressions.map(_.dataType).toArray
@@ -173,11 +173,11 @@ case class Exchange(
         } else {
           child.execute().mapPartitions { iter =>
             val hashExpressions = newMutableProjection(expressions, child.output)()
-            val mutablePair = new MutablePair[Row, Row]()
+            val mutablePair = new MutablePair[InternalRow, InternalRow]()
             iter.map(r => mutablePair.update(hashExpressions(r), r))
           }
         }
-        val shuffled = new ShuffledRDD[Row, Row, Row](rdd, part)
+        val shuffled = new ShuffledRDD[InternalRow, InternalRow, InternalRow](rdd, part)
         shuffled.setSerializer(serializer)
         shuffled.map(_._2)
 
@@ -190,7 +190,7 @@ case class Exchange(
           // Internally, RangePartitioner runs a job on the RDD that samples keys to compute
           // partition bounds. To get accurate samples, we need to copy the mutable keys.
           val rddForSampling = childRdd.mapPartitions { iter =>
-            val mutablePair = new MutablePair[Row, Null]()
+            val mutablePair = new MutablePair[InternalRow, Null]()
             iter.map(row => mutablePair.update(row.copy(), null))
           }
           // TODO: RangePartitioner should take an Ordering.
@@ -202,12 +202,12 @@ case class Exchange(
           childRdd.mapPartitions { iter => iter.map(row => (row.copy(), null))}
         } else {
           childRdd.mapPartitions { iter =>
-            val mutablePair = new MutablePair[Row, Null]()
+            val mutablePair = new MutablePair[InternalRow, Null]()
             iter.map(row => mutablePair.update(row, null))
           }
         }
 
-        val shuffled = new ShuffledRDD[Row, Null, Null](rdd, part)
+        val shuffled = new ShuffledRDD[InternalRow, Null, Null](rdd, part)
         shuffled.setSerializer(serializer)
         shuffled.map(_._1)
 
@@ -217,14 +217,16 @@ case class Exchange(
         val partitioner = new HashPartitioner(1)
 
         val rdd = if (needToCopyObjectsBeforeShuffle(partitioner, serializer)) {
-          child.execute().mapPartitions { iter => iter.map(r => (null, r.copy())) }
+          child.execute().mapPartitions {
+            iter => iter.map(r => (null, r.copy()))
+          }
         } else {
           child.execute().mapPartitions { iter =>
-            val mutablePair = new MutablePair[Null, Row]()
+            val mutablePair = new MutablePair[Null, InternalRow]()
             iter.map(r => mutablePair.update(null, r))
           }
         }
-        val shuffled = new ShuffledRDD[Null, Row, Row](rdd, partitioner)
+        val shuffled = new ShuffledRDD[Null, InternalRow, InternalRow](rdd, partitioner)
         shuffled.setSerializer(serializer)
         shuffled.map(_._2)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index f931dc9..da27a75 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution
 
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.CatalystTypeConverters
+import org.apache.spark.sql.catalyst.{InternalRow, CatalystTypeConverters}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericMutableRow}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
@@ -31,7 +31,7 @@ import org.apache.spark.sql.{Row, SQLContext}
  */
 @DeveloperApi
 object RDDConversions {
-  def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): RDD[Row] = {
+  def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): RDD[InternalRow] = {
     data.mapPartitions { iterator =>
       val numColumns = outputTypes.length
       val mutableRow = new GenericMutableRow(numColumns)
@@ -51,7 +51,7 @@ object RDDConversions {
   /**
    * Convert the objects inside Row into the types Catalyst expected.
    */
-  def rowToRowRdd(data: RDD[Row], outputTypes: Seq[DataType]): RDD[Row] = {
+  def rowToRowRdd(data: RDD[Row], outputTypes: Seq[DataType]): RDD[InternalRow] = {
     data.mapPartitions { iterator =>
       val numColumns = outputTypes.length
       val mutableRow = new GenericMutableRow(numColumns)
@@ -70,7 +70,9 @@ object RDDConversions {
 }
 
 /** Logical plan node for scanning data from an RDD. */
-private[sql] case class LogicalRDD(output: Seq[Attribute], rdd: RDD[Row])(sqlContext: SQLContext)
+private[sql] case class LogicalRDD(
+    output: Seq[Attribute],
+    rdd: RDD[InternalRow])(sqlContext: SQLContext)
   extends LogicalPlan with MultiInstanceRelation {
 
   override def children: Seq[LogicalPlan] = Nil
@@ -91,13 +93,15 @@ private[sql] case class LogicalRDD(output: Seq[Attribute], rdd: RDD[Row])(sqlCon
 }
 
 /** Physical plan node for scanning data from an RDD. */
-private[sql] case class PhysicalRDD(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode {
-  protected override def doExecute(): RDD[Row] = rdd
+private[sql] case class PhysicalRDD(
+    output: Seq[Attribute],
+    rdd: RDD[InternalRow]) extends LeafNode {
+  protected override def doExecute(): RDD[InternalRow] = rdd
 }
 
 /** Logical plan node for scanning data from a local collection. */
 private[sql]
-case class LogicalLocalTable(output: Seq[Attribute], rows: Seq[Row])(sqlContext: SQLContext)
+case class LogicalLocalTable(output: Seq[Attribute], rows: Seq[InternalRow])(sqlContext: SQLContext)
    extends LogicalPlan with MultiInstanceRelation {
 
   override def children: Seq[LogicalPlan] = Nil

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
index 4b601c1..42a0c1b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
@@ -19,10 +19,9 @@ package org.apache.spark.sql.execution
 
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.errors._
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical.{UnknownPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning}
 
 /**
  * Apply the all of the GroupExpressions to every input row, hence we will get
@@ -43,7 +42,7 @@ case class Expand(
   // as UNKNOWN partitioning
   override def outputPartitioning: Partitioning = UnknownPartitioning(0)
 
-  protected override def doExecute(): RDD[Row] = attachTree(this, "execute") {
+  protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
     child.execute().mapPartitions { iter =>
       // TODO Move out projection objects creation and transfer to
       // workers via closure. However we can't assume the Projection
@@ -51,14 +50,14 @@ case class Expand(
       // create the projections within each of the partition processing.
       val groups = projections.map(ee => newProjection(ee, child.output)).toArray
 
-      new Iterator[Row] {
-        private[this] var result: Row = _
+      new Iterator[InternalRow] {
+        private[this] var result: InternalRow = _
         private[this] var idx = -1  // -1 means the initial state
-        private[this] var input: Row = _
+        private[this] var input: InternalRow = _
 
         override final def hasNext: Boolean = (-1 < idx && idx < groups.length) || iter.hasNext
 
-        override final def next(): Row = {
+        override final def next(): InternalRow = {
           if (idx <= 0) {
             // in the initial (-1) or beginning(0) of a new input row, fetch the next input tuple
             input = iter.next()

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
index dd02c1f..c1665f7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
@@ -25,12 +25,12 @@ import org.apache.spark.sql.catalyst.expressions._
  * For lazy computing, be sure the generator.terminate() called in the very last
  * TODO reusing the CompletionIterator?
  */
-private[execution] sealed case class LazyIterator(func: () => TraversableOnce[Row])
-  extends Iterator[Row] {
+private[execution] sealed case class LazyIterator(func: () => TraversableOnce[InternalRow])
+  extends Iterator[InternalRow] {
 
   lazy val results = func().toIterator
   override def hasNext: Boolean = results.hasNext
-  override def next(): Row = results.next()
+  override def next(): InternalRow = results.next()
 }
 
 /**
@@ -58,11 +58,11 @@ case class Generate(
 
   val boundGenerator = BindReferences.bindReference(generator, child.output)
 
-  protected override def doExecute(): RDD[Row] = {
+  protected override def doExecute(): RDD[InternalRow] = {
     // boundGenerator.terminate() should be triggered after all of the rows in the partition
     if (join) {
       child.execute().mapPartitions { iter =>
-        val generatorNullRow = Row.fromSeq(Seq.fill[Any](generator.elementTypes.size)(null))
+        val generatorNullRow = InternalRow.fromSeq(Seq.fill[Any](generator.elementTypes.size)(null))
         val joinedRow = new JoinedRow
 
         iter.flatMap { row =>

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
index 1c40a92..ba2c8f5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
@@ -66,7 +66,7 @@ case class GeneratedAggregate(
 
   override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute)
 
-  protected override def doExecute(): RDD[Row] = {
+  protected override def doExecute(): RDD[InternalRow] = {
     val aggregatesToCompute = aggregateExpressions.flatMap { a =>
       a.collect { case agg: AggregateExpression => agg}
     }
@@ -273,7 +273,7 @@ case class GeneratedAggregate(
       if (groupingExpressions.isEmpty) {
         // TODO: Codegening anything other than the updateProjection is probably over kill.
         val buffer = newAggregationBuffer(EmptyRow).asInstanceOf[MutableRow]
-        var currentRow: Row = null
+        var currentRow: InternalRow = null
         updateProjection.target(buffer)
 
         while (iter.hasNext) {
@@ -295,19 +295,19 @@ case class GeneratedAggregate(
         )
 
         while (iter.hasNext) {
-          val currentRow: Row = iter.next()
-          val groupKey: Row = groupProjection(currentRow)
+          val currentRow: InternalRow = iter.next()
+          val groupKey: InternalRow = groupProjection(currentRow)
           val aggregationBuffer = aggregationMap.getAggregationBuffer(groupKey)
           updateProjection.target(aggregationBuffer)(joinedRow(aggregationBuffer, currentRow))
         }
 
-        new Iterator[Row] {
+        new Iterator[InternalRow] {
           private[this] val mapIterator = aggregationMap.iterator()
           private[this] val resultProjection = resultProjectionBuilder()
 
           def hasNext: Boolean = mapIterator.hasNext
 
-          def next(): Row = {
+          def next(): InternalRow = {
             val entry = mapIterator.next()
             val result = resultProjection(joinedRow(entry.key, entry.value))
             if (hasNext) {
@@ -326,9 +326,9 @@ case class GeneratedAggregate(
         if (unsafeEnabled) {
           log.info("Not using Unsafe-based aggregator because it is not supported for this schema")
         }
-        val buffers = new java.util.HashMap[Row, MutableRow]()
+        val buffers = new java.util.HashMap[InternalRow, MutableRow]()
 
-        var currentRow: Row = null
+        var currentRow: InternalRow = null
         while (iter.hasNext) {
           currentRow = iter.next()
           val currentGroup = groupProjection(currentRow)
@@ -342,13 +342,13 @@ case class GeneratedAggregate(
           updateProjection.target(currentBuffer)(joinedRow(currentBuffer, currentRow))
         }
 
-        new Iterator[Row] {
+        new Iterator[InternalRow] {
           private[this] val resultIterator = buffers.entrySet.iterator()
           private[this] val resultProjection = resultProjectionBuilder()
 
           def hasNext: Boolean = resultIterator.hasNext
 
-          def next(): Row = {
+          def next(): InternalRow = {
             val currentGroup = resultIterator.next()
             resultProjection(joinedRow(currentGroup.getKey, currentGroup.getValue))
           }

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
index 03bee80..cd34118 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
@@ -19,18 +19,20 @@ package org.apache.spark.sql.execution
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.CatalystTypeConverters
+import org.apache.spark.sql.catalyst.{InternalRow, CatalystTypeConverters}
 import org.apache.spark.sql.catalyst.expressions.Attribute
 
 
 /**
  * Physical plan node for scanning data from a local collection.
  */
-private[sql] case class LocalTableScan(output: Seq[Attribute], rows: Seq[Row]) extends LeafNode {
+private[sql] case class LocalTableScan(
+    output: Seq[Attribute],
+    rows: Seq[InternalRow]) extends LeafNode {
 
   private lazy val rdd = sqlContext.sparkContext.parallelize(rows)
 
-  protected override def doExecute(): RDD[Row] = rdd
+  protected override def doExecute(): RDD[InternalRow] = rdd
 
 
   override def executeCollect(): Array[Row] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 435ac01..7739a9f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -23,6 +23,7 @@ import org.apache.spark.rdd.{RDD, RDDOperationScope}
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, trees}
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.physical._
@@ -79,11 +80,11 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
   def requiredChildOrdering: Seq[Seq[SortOrder]] = Seq.fill(children.size)(Nil)
 
   /**
-   * Returns the result of this query as an RDD[Row] by delegating to doExecute
+   * Returns the result of this query as an RDD[InternalRow] by delegating to doExecute
    * after adding query plan information to created RDDs for visualization.
    * Concrete implementations of SparkPlan should override doExecute instead.
    */
-  final def execute(): RDD[Row] = {
+  final def execute(): RDD[InternalRow] = {
     RDDOperationScope.withScope(sparkContext, nodeName, false, true) {
       doExecute()
     }
@@ -91,9 +92,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
 
   /**
    * Overridden by concrete implementations of SparkPlan.
-   * Produces the result of the query as an RDD[Row]
+   * Produces the result of the query as an RDD[InternalRow]
    */
-  protected def doExecute(): RDD[Row]
+  protected def doExecute(): RDD[InternalRow]
 
   /**
    * Runs this query returning the result as an array.
@@ -117,7 +118,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
 
     val childRDD = execute().map(_.copy())
 
-    val buf = new ArrayBuffer[Row]
+    val buf = new ArrayBuffer[InternalRow]
     val totalParts = childRDD.partitions.length
     var partsScanned = 0
     while (buf.size < n && partsScanned < totalParts) {
@@ -140,7 +141,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
       val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts)
       val sc = sqlContext.sparkContext
       val res =
-        sc.runJob(childRDD, (it: Iterator[Row]) => it.take(left).toArray, p, allowLocal = false)
+        sc.runJob(childRDD, (it: Iterator[InternalRow]) => it.take(left).toArray, p,
+          allowLocal = false)
 
       res.foreach(buf ++= _.take(n - buf.size))
       partsScanned += numPartsToTry
@@ -175,7 +177,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
 
 
   protected def newPredicate(
-      expression: Expression, inputSchema: Seq[Attribute]): (Row) => Boolean = {
+      expression: Expression, inputSchema: Seq[Attribute]): (InternalRow) => Boolean = {
     if (codegenEnabled) {
       GeneratePredicate.generate(expression, inputSchema)
     } else {
@@ -183,7 +185,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
     }
   }
 
-  protected def newOrdering(order: Seq[SortOrder], inputSchema: Seq[Attribute]): Ordering[Row] = {
+  protected def newOrdering(
+      order: Seq[SortOrder],
+      inputSchema: Seq[Attribute]): Ordering[InternalRow] = {
     if (codegenEnabled) {
       GenerateOrdering.generate(order, inputSchema)
     } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 7a1331a..422992d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -203,7 +203,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
   }
 
   protected lazy val singleRowRdd =
-    sparkContext.parallelize(Seq(new GenericRow(Array[Any]()): Row), 1)
+    sparkContext.parallelize(Seq(new GenericRow(Array[Any]()): InternalRow), 1)
 
   object TakeOrdered extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
index c4327ce..fd6f1d7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
@@ -20,9 +20,8 @@ package org.apache.spark.sql.execution
 import java.util
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, Distribution, ClusteredDistribution, Partitioning}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning}
 import org.apache.spark.util.collection.CompactBuffer
 
 /**
@@ -112,16 +111,16 @@ case class Window(
     }
   }
 
-  protected override def doExecute(): RDD[Row] = {
+  protected override def doExecute(): RDD[InternalRow] = {
     child.execute().mapPartitions { iter =>
-      new Iterator[Row] {
+      new Iterator[InternalRow] {
 
         // Although input rows are grouped based on windowSpec.partitionSpec, we need to
         // know when we have a new partition.
         // This is to manually construct an ordering that can be used to compare rows.
         // TODO: We may want to have a newOrdering that takes BoundReferences.
         // So, we can take advantave of code gen.
-        private val partitionOrdering: Ordering[Row] =
+        private val partitionOrdering: Ordering[InternalRow] =
           RowOrdering.forSchema(windowSpec.partitionSpec.map(_.dataType))
 
         // This is used to project expressions for the partition specification.
@@ -137,13 +136,13 @@ case class Window(
         // The number of buffered rows in the inputRowBuffer (the size of the current partition).
         var partitionSize: Int = 0
         // The buffer used to buffer rows in a partition.
-        var inputRowBuffer: CompactBuffer[Row] = _
+        var inputRowBuffer: CompactBuffer[InternalRow] = _
         // The partition key of the current partition.
-        var currentPartitionKey: Row = _
+        var currentPartitionKey: InternalRow = _
         // The partition key of next partition.
-        var nextPartitionKey: Row = _
+        var nextPartitionKey: InternalRow = _
         // The first row of next partition.
-        var firstRowInNextPartition: Row = _
+        var firstRowInNextPartition: InternalRow = _
         // Indicates if this partition is the last one in the iter.
         var lastPartition: Boolean = false
 
@@ -316,7 +315,7 @@ case class Window(
           !lastPartition || (rowPosition < partitionSize)
         }
 
-        override final def next(): Row = {
+        override final def next(): InternalRow = {
           if (hasNext) {
             if (rowPosition == partitionSize) {
               // All rows of this buffer have been consumed.
@@ -353,7 +352,7 @@ case class Window(
         // Fetch the next partition.
         private def fetchNextPartition(): Unit = {
           // Create a new buffer for input rows.
-          inputRowBuffer = new CompactBuffer[Row]()
+          inputRowBuffer = new CompactBuffer[InternalRow]()
           // We already have the first row for this partition
           // (recorded in firstRowInNextPartition). Add it back.
           inputRowBuffer += firstRowInNextPartition

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index fb42072..7aedd63 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -17,16 +17,17 @@
 
 package org.apache.spark.sql.execution
 
-import org.apache.spark.{SparkEnv, HashPartitioner, SparkConf}
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.{RDD, ShuffledRDD}
 import org.apache.spark.shuffle.sort.SortShuffleManager
 import org.apache.spark.sql.catalyst.CatalystTypeConverters
 import org.apache.spark.sql.catalyst.errors._
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.util.{CompletionIterator, MutablePair}
 import org.apache.spark.util.collection.ExternalSorter
+import org.apache.spark.util.{CompletionIterator, MutablePair}
+import org.apache.spark.{HashPartitioner, SparkEnv}
 
 /**
  * :: DeveloperApi ::
@@ -37,7 +38,7 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends
 
   @transient lazy val buildProjection = newMutableProjection(projectList, child.output)
 
-  protected override def doExecute(): RDD[Row] = child.execute().mapPartitions { iter =>
+  protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter =>
     val resuableProjection = buildProjection()
     iter.map(resuableProjection)
   }
@@ -52,9 +53,10 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends
 case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode {
   override def output: Seq[Attribute] = child.output
 
-  @transient lazy val conditionEvaluator: (Row) => Boolean = newPredicate(condition, child.output)
+  @transient lazy val conditionEvaluator: (InternalRow) => Boolean =
+    newPredicate(condition, child.output)
 
-  protected override def doExecute(): RDD[Row] = child.execute().mapPartitions { iter =>
+  protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter =>
     iter.filter(conditionEvaluator)
   }
 
@@ -83,7 +85,7 @@ case class Sample(
   override def output: Seq[Attribute] = child.output
 
   // TODO: How to pick seed?
-  protected override def doExecute(): RDD[Row] = {
+  protected override def doExecute(): RDD[InternalRow] = {
     if (withReplacement) {
       child.execute().map(_.copy()).sample(withReplacement, upperBound - lowerBound, seed)
     } else {
@@ -99,7 +101,8 @@ case class Sample(
 case class Union(children: Seq[SparkPlan]) extends SparkPlan {
   // TODO: attributes output by union should be distinct for nullability purposes
   override def output: Seq[Attribute] = children.head.output
-  protected override def doExecute(): RDD[Row] = sparkContext.union(children.map(_.execute()))
+  protected override def doExecute(): RDD[InternalRow] =
+    sparkContext.union(children.map(_.execute()))
 }
 
 /**
@@ -124,19 +127,19 @@ case class Limit(limit: Int, child: SparkPlan)
 
   override def executeCollect(): Array[Row] = child.executeTake(limit)
 
-  protected override def doExecute(): RDD[Row] = {
-    val rdd: RDD[_ <: Product2[Boolean, Row]] = if (sortBasedShuffleOn) {
+  protected override def doExecute(): RDD[InternalRow] = {
+    val rdd: RDD[_ <: Product2[Boolean, InternalRow]] = if (sortBasedShuffleOn) {
       child.execute().mapPartitions { iter =>
         iter.take(limit).map(row => (false, row.copy()))
       }
     } else {
       child.execute().mapPartitions { iter =>
-        val mutablePair = new MutablePair[Boolean, Row]()
+        val mutablePair = new MutablePair[Boolean, InternalRow]()
         iter.take(limit).map(row => mutablePair.update(false, row))
       }
     }
     val part = new HashPartitioner(1)
-    val shuffled = new ShuffledRDD[Boolean, Row, Row](rdd, part)
+    val shuffled = new ShuffledRDD[Boolean, InternalRow, InternalRow](rdd, part)
     shuffled.setSerializer(new SparkSqlSerializer(child.sqlContext.sparkContext.getConf))
     shuffled.mapPartitions(_.take(limit).map(_._2))
   }
@@ -157,7 +160,8 @@ case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan)
 
   private val ord: RowOrdering = new RowOrdering(sortOrder, child.output)
 
-  private def collectData(): Array[Row] = child.execute().map(_.copy()).takeOrdered(limit)(ord)
+  private def collectData(): Array[InternalRow] =
+    child.execute().map(_.copy()).takeOrdered(limit)(ord)
 
   override def executeCollect(): Array[Row] = {
     val converter = CatalystTypeConverters.createToScalaConverter(schema)
@@ -166,7 +170,7 @@ case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan)
 
   // TODO: Terminal split should be implemented differently from non-terminal split.
   // TODO: Pick num splits based on |limit|.
-  protected override def doExecute(): RDD[Row] = sparkContext.makeRDD(collectData(), 1)
+  protected override def doExecute(): RDD[InternalRow] = sparkContext.makeRDD(collectData(), 1)
 
   override def outputOrdering: Seq[SortOrder] = sortOrder
 }
@@ -186,7 +190,7 @@ case class Sort(
   override def requiredChildDistribution: Seq[Distribution] =
     if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
 
-  protected override def doExecute(): RDD[Row] = attachTree(this, "sort") {
+  protected override def doExecute(): RDD[InternalRow] = attachTree(this, "sort") {
     child.execute().mapPartitions( { iterator =>
       val ordering = newOrdering(sortOrder, child.output)
       iterator.map(_.copy()).toArray.sorted(ordering).iterator
@@ -214,14 +218,14 @@ case class ExternalSort(
   override def requiredChildDistribution: Seq[Distribution] =
     if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
 
-  protected override def doExecute(): RDD[Row] = attachTree(this, "sort") {
+  protected override def doExecute(): RDD[InternalRow] = attachTree(this, "sort") {
     child.execute().mapPartitions( { iterator =>
       val ordering = newOrdering(sortOrder, child.output)
-      val sorter = new ExternalSorter[Row, Null, Row](ordering = Some(ordering))
+      val sorter = new ExternalSorter[InternalRow, Null, InternalRow](ordering = Some(ordering))
       sorter.insertAll(iterator.map(r => (r.copy, null)))
       val baseIterator = sorter.iterator.map(_._1)
       // TODO(marmbrus): The complex type signature below thwarts inference for no reason.
-      CompletionIterator[Row, Iterator[Row]](baseIterator, sorter.stop())
+      CompletionIterator[InternalRow, Iterator[InternalRow]](baseIterator, sorter.stop())
     }, preservesPartitioning = true)
   }
 
@@ -239,7 +243,7 @@ case class Repartition(numPartitions: Int, shuffle: Boolean, child: SparkPlan)
   extends UnaryNode {
   override def output: Seq[Attribute] = child.output
 
-  protected override def doExecute(): RDD[Row] = {
+  protected override def doExecute(): RDD[InternalRow] = {
     child.execute().map(_.copy()).coalesce(numPartitions, shuffle)
   }
 }
@@ -254,7 +258,7 @@ case class Repartition(numPartitions: Int, shuffle: Boolean, child: SparkPlan)
 case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode {
   override def output: Seq[Attribute] = left.output
 
-  protected override def doExecute(): RDD[Row] = {
+  protected override def doExecute(): RDD[InternalRow] = {
     left.execute().map(_.copy()).subtract(right.execute().map(_.copy()))
   }
 }
@@ -268,7 +272,7 @@ case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode {
 case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode {
   override def output: Seq[Attribute] = children.head.output
 
-  protected override def doExecute(): RDD[Row] = {
+  protected override def doExecute(): RDD[InternalRow] = {
     left.execute().map(_.copy()).intersection(right.execute().map(_.copy()))
   }
 }
@@ -283,5 +287,5 @@ case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode {
 case class OutputFaker(output: Seq[Attribute], child: SparkPlan) extends SparkPlan {
   def children: Seq[SparkPlan] = child :: Nil
 
-  protected override def doExecute(): RDD[Row] = child.execute()
+  protected override def doExecute(): RDD[InternalRow] = child.execute()
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index 49b361e..653792e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -20,13 +20,13 @@ package org.apache.spark.sql.execution
 import org.apache.spark.Logging
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.CatalystTypeConverters
+import org.apache.spark.sql.catalyst.{InternalRow, CatalystTypeConverters}
 import org.apache.spark.sql.catalyst.errors.TreeNodeException
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Row}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.{DataFrame, SQLConf, SQLContext}
+import org.apache.spark.sql.{DataFrame, Row, SQLConf, SQLContext}
 
 /**
  * A logical command that is executed for its side-effects.  `RunnableCommand`s are
@@ -64,9 +64,9 @@ private[sql] case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan
 
   override def executeTake(limit: Int): Array[Row] = sideEffectResult.take(limit).toArray
 
-  protected override def doExecute(): RDD[Row] = {
+  protected override def doExecute(): RDD[InternalRow] = {
     val converted = sideEffectResult.map(r =>
-      CatalystTypeConverters.convertToCatalyst(r, schema).asInstanceOf[Row])
+      CatalystTypeConverters.convertToCatalyst(r, schema).asInstanceOf[InternalRow])
     sqlContext.sparkContext.parallelize(converted, 1)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index 83c1f65..3ee4033 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution
 
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -25,7 +26,7 @@ import scala.collection.mutable.HashSet
 
 import org.apache.spark.{AccumulatorParam, Accumulator}
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.sql.{SQLConf, SQLContext, DataFrame, Row}
+import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.trees.TreeNodeRef
 import org.apache.spark.sql.types._
 
@@ -126,11 +127,11 @@ package object debug {
       }
     }
 
-    protected override def doExecute(): RDD[Row] = {
+    protected override def doExecute(): RDD[InternalRow] = {
       child.execute().mapPartitions { iter =>
-        new Iterator[Row] {
+        new Iterator[InternalRow] {
           def hasNext: Boolean = iter.hasNext
-          def next(): Row = {
+          def next(): InternalRow = {
             val currentRow = iter.next()
             tupleCount += 1
             var i = 0
@@ -155,7 +156,7 @@ package object debug {
     def typeCheck(data: Any, schema: DataType): Unit = (data, schema) match {
       case (null, _) =>
 
-      case (row: Row, StructType(fields)) =>
+      case (row: InternalRow, StructType(fields)) =>
         row.toSeq.zip(fields.map(_.dataType)).foreach { case(d, t) => typeCheck(d, t) }
       case (s: Seq[_], ArrayType(elemType, _)) =>
         s.foreach(typeCheck(_, elemType))
@@ -196,7 +197,7 @@ package object debug {
 
     def children: List[SparkPlan] = child :: Nil
 
-    protected override def doExecute(): RDD[Row] = {
+    protected override def doExecute(): RDD[InternalRow] = {
       child.execute().map { row =>
         try typeCheck(row, child.schema) catch {
           case e: Exception =>

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala
index e228a60..68914cf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.execution.expressions
 
 import org.apache.spark.TaskContext
-import org.apache.spark.sql.catalyst.expressions.{Row, LeafExpression}
+import org.apache.spark.sql.catalyst.expressions.{InternalRow, LeafExpression}
 import org.apache.spark.sql.types.{LongType, DataType}
 
 /**
@@ -43,7 +43,7 @@ private[sql] case class MonotonicallyIncreasingID() extends LeafExpression {
 
   override def dataType: DataType = LongType
 
-  override def eval(input: Row): Long = {
+  override def eval(input: InternalRow): Long = {
     val currentCount = count
     count += 1
     (TaskContext.get().partitionId().toLong << 33) + currentCount

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala
index 1272793..12c2eed 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.execution.expressions
 
 import org.apache.spark.TaskContext
-import org.apache.spark.sql.catalyst.expressions.{LeafExpression, Row}
+import org.apache.spark.sql.catalyst.expressions.{LeafExpression, InternalRow}
 import org.apache.spark.sql.types.{IntegerType, DataType}
 
 
@@ -31,5 +31,5 @@ private[sql] case object SparkPartitionID extends LeafExpression {
 
   override def dataType: DataType = IntegerType
 
-  override def eval(input: Row): Int = TaskContext.get().partitionId()
+  override def eval(input: InternalRow): Int = TaskContext.get().partitionId()
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
index b8b12be..2d2e1b9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
@@ -17,16 +17,15 @@
 
 package org.apache.spark.sql.execution.joins
 
-import org.apache.spark.rdd.RDD
-import org.apache.spark.util.ThreadUtils
-
 import scala.concurrent._
 import scala.concurrent.duration._
 
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.sql.catalyst.expressions.{Row, Expression}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.{Expression, InternalRow}
 import org.apache.spark.sql.catalyst.plans.physical.{Distribution, Partitioning, UnspecifiedDistribution}
 import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+import org.apache.spark.util.ThreadUtils
 
 /**
  * :: DeveloperApi ::
@@ -61,12 +60,12 @@ case class BroadcastHashJoin(
   @transient
   private val broadcastFuture = future {
     // Note that we use .execute().collect() because we don't want to convert data to Scala types
-    val input: Array[Row] = buildPlan.execute().map(_.copy()).collect()
+    val input: Array[InternalRow] = buildPlan.execute().map(_.copy()).collect()
     val hashed = HashedRelation(input.iterator, buildSideKeyGenerator, input.length)
     sparkContext.broadcast(hashed)
   }(BroadcastHashJoin.broadcastHashJoinExecutionContext)
 
-  protected override def doExecute(): RDD[Row] = {
+  protected override def doExecute(): RDD[InternalRow] = {
     val broadcastRelation = Await.result(broadcastFuture, timeout)
 
     streamedPlan.execute().mapPartitions { streamedIter =>

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
index a32e5fc..044964f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.joins
 
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row}
+import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
 
 /**
@@ -38,10 +38,10 @@ case class BroadcastLeftSemiJoinHash(
 
   override def output: Seq[Attribute] = left.output
 
-  protected override def doExecute(): RDD[Row] = {
+  protected override def doExecute(): RDD[InternalRow] = {
     val buildIter = buildPlan.execute().map(_.copy()).collect().toIterator
-    val hashSet = new java.util.HashSet[Row]()
-    var currentRow: Row = null
+    val hashSet = new java.util.HashSet[InternalRow]()
+    var currentRow: InternalRow = null
 
     // Create a Hash set of buildKeys
     while (buildIter.hasNext) {

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
index caad3df..0b2cf8e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
@@ -61,13 +61,14 @@ case class BroadcastNestedLoopJoin(
   @transient private lazy val boundCondition =
     newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
 
-  protected override def doExecute(): RDD[Row] = {
+  protected override def doExecute(): RDD[InternalRow] = {
     val broadcastedRelation =
-      sparkContext.broadcast(broadcast.execute().map(_.copy()).collect().toIndexedSeq)
+      sparkContext.broadcast(broadcast.execute().map(_.copy())
+        .collect().toIndexedSeq)
 
     /** All rows that either match both-way, or rows from streamed joined with nulls. */
     val matchesOrStreamedRowsWithNulls = streamed.execute().mapPartitions { streamedIter =>
-      val matchedRows = new CompactBuffer[Row]
+      val matchedRows = new CompactBuffer[InternalRow]
       // TODO: Use Spark's BitSet.
       val includedBroadcastTuples =
         new scala.collection.mutable.BitSet(broadcastedRelation.value.size)
@@ -118,8 +119,8 @@ case class BroadcastNestedLoopJoin(
     val leftNulls = new GenericMutableRow(left.output.size)
     val rightNulls = new GenericMutableRow(right.output.size)
     /** Rows from broadcasted joined with nulls. */
-    val broadcastRowsWithNulls: Seq[Row] = {
-      val buf: CompactBuffer[Row] = new CompactBuffer()
+    val broadcastRowsWithNulls: Seq[InternalRow] = {
+      val buf: CompactBuffer[InternalRow] = new CompactBuffer()
       var i = 0
       val rel = broadcastedRelation.value
       while (i < rel.length) {

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
index 191c00c..261b472 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.joins
 
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Attribute, JoinedRow}
 import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
 
@@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
 case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNode {
   override def output: Seq[Attribute] = left.output ++ right.output
 
-  protected override def doExecute(): RDD[Row] = {
+  protected override def doExecute(): RDD[InternalRow] = {
     val leftResults = left.execute().map(_.copy())
     val rightResults = right.execute().map(_.copy())
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index 851de16..3a4196a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -49,11 +49,13 @@ trait HashJoin {
   @transient protected lazy val streamSideKeyGenerator: () => MutableProjection =
     newMutableProjection(streamedKeys, streamedPlan.output)
 
-  protected def hashJoin(streamIter: Iterator[Row], hashedRelation: HashedRelation): Iterator[Row] =
+  protected def hashJoin(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation): Iterator[InternalRow] =
   {
-    new Iterator[Row] {
-      private[this] var currentStreamedRow: Row = _
-      private[this] var currentHashMatches: CompactBuffer[Row] = _
+    new Iterator[InternalRow] {
+      private[this] var currentStreamedRow: InternalRow = _
+      private[this] var currentHashMatches: CompactBuffer[InternalRow] = _
       private[this] var currentMatchPosition: Int = -1
 
       // Mutable per row objects.
@@ -65,7 +67,7 @@ trait HashJoin {
         (currentMatchPosition != -1 && currentMatchPosition < currentHashMatches.size) ||
           (streamIter.hasNext && fetchNext())
 
-      override final def next(): Row = {
+      override final def next(): InternalRow = {
         val ret = buildSide match {
           case BuildRight => joinRow(currentStreamedRow, currentHashMatches(currentMatchPosition))
           case BuildLeft => joinRow(currentHashMatches(currentMatchPosition), currentStreamedRow)

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
index c21a453..19aef99 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
@@ -68,26 +68,29 @@ case class HashOuterJoin(
     }
   }
 
-  @transient private[this] lazy val DUMMY_LIST = Seq[Row](null)
-  @transient private[this] lazy val EMPTY_LIST = Seq.empty[Row]
+  @transient private[this] lazy val DUMMY_LIST = Seq[InternalRow](null)
+  @transient private[this] lazy val EMPTY_LIST = Seq.empty[InternalRow]
 
   @transient private[this] lazy val leftNullRow = new GenericRow(left.output.length)
   @transient private[this] lazy val rightNullRow = new GenericRow(right.output.length)
   @transient private[this] lazy val boundCondition =
-    condition.map(newPredicate(_, left.output ++ right.output)).getOrElse((row: Row) => true)
+    condition.map(
+      newPredicate(_, left.output ++ right.output)).getOrElse((row: InternalRow) => true)
 
   // TODO we need to rewrite all of the iterators with our own implementation instead of the Scala
   // iterator for performance purpose.
 
   private[this] def leftOuterIterator(
-      key: Row, joinedRow: JoinedRow, rightIter: Iterable[Row]): Iterator[Row] = {
-    val ret: Iterable[Row] = {
+      key: InternalRow,
+      joinedRow: JoinedRow,
+      rightIter: Iterable[InternalRow]): Iterator[InternalRow] = {
+    val ret: Iterable[InternalRow] = {
       if (!key.anyNull) {
         val temp = rightIter.collect {
           case r if boundCondition(joinedRow.withRight(r)) => joinedRow.copy()
         }
         if (temp.size == 0) {
-          joinedRow.withRight(rightNullRow).copy :: Nil
+          joinedRow.withRight(rightNullRow).copy.asInstanceOf[InternalRow] :: Nil
         } else {
           temp
         }
@@ -99,12 +102,15 @@ case class HashOuterJoin(
   }
 
   private[this] def rightOuterIterator(
-      key: Row, leftIter: Iterable[Row], joinedRow: JoinedRow): Iterator[Row] = {
+      key: InternalRow,
+      leftIter: Iterable[InternalRow],
+      joinedRow: JoinedRow): Iterator[InternalRow] = {
 
-    val ret: Iterable[Row] = {
+    val ret: Iterable[InternalRow] = {
       if (!key.anyNull) {
         val temp = leftIter.collect {
-          case l if boundCondition(joinedRow.withLeft(l)) => joinedRow.copy
+          case l if boundCondition(joinedRow.withLeft(l)) =>
+            joinedRow.copy
         }
         if (temp.size == 0) {
           joinedRow.withLeft(leftNullRow).copy :: Nil
@@ -119,14 +125,14 @@ case class HashOuterJoin(
   }
 
   private[this] def fullOuterIterator(
-      key: Row, leftIter: Iterable[Row], rightIter: Iterable[Row],
-      joinedRow: JoinedRow): Iterator[Row] = {
+      key: InternalRow, leftIter: Iterable[InternalRow], rightIter: Iterable[InternalRow],
+      joinedRow: JoinedRow): Iterator[InternalRow] = {
 
     if (!key.anyNull) {
       // Store the positions of records in right, if one of its associated row satisfy
       // the join condition.
       val rightMatchedSet = scala.collection.mutable.Set[Int]()
-      leftIter.iterator.flatMap[Row] { l =>
+      leftIter.iterator.flatMap[InternalRow] { l =>
         joinedRow.withLeft(l)
         var matched = false
         rightIter.zipWithIndex.collect {
@@ -157,24 +163,25 @@ case class HashOuterJoin(
           joinedRow(leftNullRow, r).copy()
       }
     } else {
-      leftIter.iterator.map[Row] { l =>
+      leftIter.iterator.map[InternalRow] { l =>
         joinedRow(l, rightNullRow).copy()
-      } ++ rightIter.iterator.map[Row] { r =>
+      } ++ rightIter.iterator.map[InternalRow] { r =>
         joinedRow(leftNullRow, r).copy()
       }
     }
   }
 
   private[this] def buildHashTable(
-      iter: Iterator[Row], keyGenerator: Projection): JavaHashMap[Row, CompactBuffer[Row]] = {
-    val hashTable = new JavaHashMap[Row, CompactBuffer[Row]]()
+      iter: Iterator[InternalRow],
+      keyGenerator: Projection): JavaHashMap[InternalRow, CompactBuffer[InternalRow]] = {
+    val hashTable = new JavaHashMap[InternalRow, CompactBuffer[InternalRow]]()
     while (iter.hasNext) {
       val currentRow = iter.next()
       val rowKey = keyGenerator(currentRow)
 
       var existingMatchList = hashTable.get(rowKey)
       if (existingMatchList == null) {
-        existingMatchList = new CompactBuffer[Row]()
+        existingMatchList = new CompactBuffer[InternalRow]()
         hashTable.put(rowKey, existingMatchList)
       }
 
@@ -184,7 +191,7 @@ case class HashOuterJoin(
     hashTable
   }
 
-  protected override def doExecute(): RDD[Row] = {
+  protected override def doExecute(): RDD[InternalRow] = {
     val joinedRow = new JoinedRow()
     left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
       // TODO this probably can be replaced by external sort (sort merged join?)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org