You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/06/13 08:06:42 UTC

[5/5] spark git commit: [SPARK-7186] [SQL] Decouple internal Row from external Row

[SPARK-7186] [SQL] Decouple internal Row from external Row

Currently, we use o.a.s.sql.Row both internally and externally. The external interface is wider than what the internal needs because it is designed to facilitate end-user programming. This design has proven to be very error prone and cumbersome for internal Row implementations.

As a first step, we create an InternalRow interface in the catalyst module, which is identical to the current Row interface. And we switch all internal operators/expressions to use this InternalRow instead. When we need to expose Row, we convert the InternalRow implementation into Row for users.

For all public API, we use Row (for example, data source APIs), which will be converted into/from InternalRow by CatalystTypeConverters.

For all internal data sources (Json, Parquet, JDBC, Hive), we use InternalRow for better performance, casted into Row in buildScan() (without change the public API). When create a PhysicalRDD, we cast them back to InternalRow.

cc rxin marmbrus JoshRosen

Author: Davies Liu <da...@databricks.com>

Closes #6792 from davies/internal_row and squashes the following commits:

f2abd13 [Davies Liu] fix scalastyle
a7e025c [Davies Liu] move InternalRow into catalyst
30db8ba [Davies Liu] Merge branch 'master' of github.com:apache/spark into internal_row
7cbced8 [Davies Liu] separate Row and InternalRow


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d46f8e5d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d46f8e5d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d46f8e5d

Branch: refs/heads/master
Commit: d46f8e5d4b5c1278e0fae3ad133b2229ac01b197
Parents: 6e9c3ff
Author: Davies Liu <da...@databricks.com>
Authored: Fri Jun 12 23:06:31 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Fri Jun 12 23:06:31 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/BaseMutableRow.java    |  68 ++++++
 .../main/java/org/apache/spark/sql/BaseRow.java | 218 +++++++++++++++++++
 .../UnsafeFixedWidthAggregationMap.java         |   8 +-
 .../sql/catalyst/expressions/UnsafeRow.java     |   4 +-
 .../org/apache/spark/sql/BaseMutableRow.java    |  68 ------
 .../scala/org/apache/spark/sql/BaseRow.java     | 217 ------------------
 .../sql/catalyst/CatalystTypeConverters.scala   |  54 ++---
 .../apache/spark/sql/catalyst/InternalRow.scala |  57 +++++
 .../sql/catalyst/analysis/unresolved.scala      |  11 +-
 .../catalyst/expressions/BoundAttribute.scala   |   4 +-
 .../spark/sql/catalyst/expressions/Cast.scala   |   5 +-
 .../sql/catalyst/expressions/Expression.scala   |   4 +-
 .../sql/catalyst/expressions/ExtractValue.scala |  12 +-
 .../sql/catalyst/expressions/Projection.scala   | 104 ++++-----
 .../sql/catalyst/expressions/ScalaUdf.scala     |  51 ++---
 .../sql/catalyst/expressions/SortOrder.scala    |   4 +-
 .../expressions/SpecificMutableRow.scala        |   2 +-
 .../expressions/UnsafeRowConverter.scala        |  66 ++++--
 .../sql/catalyst/expressions/aggregates.scala   |  73 ++++---
 .../sql/catalyst/expressions/arithmetic.scala   |  15 +-
 .../expressions/codegen/CodeGenerator.scala     |   7 +-
 .../codegen/GenerateMutableProjection.scala     |  11 +-
 .../expressions/codegen/GenerateOrdering.scala  |  17 +-
 .../expressions/codegen/GeneratePredicate.scala |  15 +-
 .../codegen/GenerateProjection.scala            |  13 +-
 .../sql/catalyst/expressions/complexTypes.scala |   7 +-
 .../sql/catalyst/expressions/conditionals.scala |   7 +-
 .../catalyst/expressions/decimalFunctions.scala |   7 +-
 .../sql/catalyst/expressions/generators.scala   |  15 +-
 .../sql/catalyst/expressions/literals.scala     |   7 +-
 .../spark/sql/catalyst/expressions/math.scala   |   9 +-
 .../catalyst/expressions/namedExpressions.scala |   7 +-
 .../catalyst/expressions/nullFunctions.scala    |   9 +-
 .../sql/catalyst/expressions/package.scala      |  28 +--
 .../sql/catalyst/expressions/predicates.scala   |  20 +-
 .../spark/sql/catalyst/expressions/random.scala |   4 +-
 .../spark/sql/catalyst/expressions/rows.scala   |  23 +-
 .../spark/sql/catalyst/expressions/sets.scala   |  10 +-
 .../catalyst/expressions/stringOperations.scala |  12 +-
 .../expressions/windowExpressions.scala         |  12 +-
 .../catalyst/plans/logical/LocalRelation.scala  |   9 +-
 .../catalyst/plans/physical/partitioning.scala  |   7 +-
 .../sql/catalyst/ScalaReflectionSuite.scala     |   8 +-
 .../sql/catalyst/expressions/CastSuite.scala    |  22 +-
 .../catalyst/expressions/ComplexTypeSuite.scala |   4 +-
 .../expressions/ExpressionEvalHelper.scala      |  16 +-
 .../UnsafeFixedWidthAggregationMapSuite.scala   |   2 +-
 .../expressions/UnsafeRowConverterSuite.scala   |   4 +-
 .../optimizer/ConvertToLocalRelationSuite.scala |  10 +-
 .../sql/catalyst/trees/TreeNodeSuite.scala      |   2 +-
 .../sql/catalyst/util/DateUtilsSuite.scala      |   1 -
 .../scala/org/apache/spark/sql/DataFrame.scala  |  18 +-
 .../org/apache/spark/sql/DataFrameReader.scala  |   2 +-
 .../scala/org/apache/spark/sql/SQLContext.scala |  26 ++-
 .../spark/sql/columnar/ColumnBuilder.scala      |   7 +-
 .../apache/spark/sql/columnar/ColumnStats.scala |  65 +++---
 .../columnar/InMemoryColumnarTableScan.scala    |  27 +--
 .../sql/columnar/NullableColumnBuilder.scala    |   4 +-
 .../compression/CompressibleColumnBuilder.scala |   6 +-
 .../compression/CompressionScheme.scala         |   5 +-
 .../compression/compressionSchemes.scala        |  13 +-
 .../apache/spark/sql/execution/Aggregate.scala  |  14 +-
 .../apache/spark/sql/execution/Exchange.scala   |  24 +-
 .../spark/sql/execution/ExistingRDD.scala       |  18 +-
 .../org/apache/spark/sql/execution/Expand.scala |  13 +-
 .../apache/spark/sql/execution/Generate.scala   |  10 +-
 .../sql/execution/GeneratedAggregate.scala      |  20 +-
 .../spark/sql/execution/LocalTableScan.scala    |   8 +-
 .../apache/spark/sql/execution/SparkPlan.scala  |  20 +-
 .../spark/sql/execution/SparkStrategies.scala   |   2 +-
 .../org/apache/spark/sql/execution/Window.scala |  21 +-
 .../spark/sql/execution/basicOperators.scala    |  46 ++--
 .../apache/spark/sql/execution/commands.scala   |  10 +-
 .../spark/sql/execution/debug/package.scala     |  13 +-
 .../expressions/MonotonicallyIncreasingID.scala |   4 +-
 .../expressions/SparkPartitionID.scala          |   4 +-
 .../sql/execution/joins/BroadcastHashJoin.scala |  11 +-
 .../joins/BroadcastLeftSemiJoinHash.scala       |   8 +-
 .../joins/BroadcastNestedLoopJoin.scala         |  11 +-
 .../sql/execution/joins/CartesianProduct.scala  |   4 +-
 .../spark/sql/execution/joins/HashJoin.scala    |  12 +-
 .../sql/execution/joins/HashOuterJoin.scala     |  43 ++--
 .../sql/execution/joins/HashedRelation.scala    |  25 ++-
 .../sql/execution/joins/LeftSemiJoinBNL.scala   |   2 +-
 .../sql/execution/joins/LeftSemiJoinHash.scala  |   8 +-
 .../sql/execution/joins/ShuffledHashJoin.scala  |   4 +-
 .../sql/execution/joins/SortMergeJoin.scala     |  22 +-
 .../apache/spark/sql/execution/pythonUdfs.scala |   9 +-
 .../sql/execution/stat/FrequentItems.scala      |   5 +-
 .../org/apache/spark/sql/jdbc/JDBCRDD.scala     |  18 +-
 .../apache/spark/sql/jdbc/JDBCRelation.scala    |   5 +-
 .../apache/spark/sql/json/JSONRelation.scala    |  14 +-
 .../spark/sql/json/JacksonGenerator.scala       |   2 +-
 .../apache/spark/sql/json/JacksonParser.scala   |  13 +-
 .../org/apache/spark/sql/json/JsonRDD.scala     |   4 +-
 .../spark/sql/parquet/ParquetConverter.scala    |  16 +-
 .../sql/parquet/ParquetTableOperations.scala    |  38 ++--
 .../spark/sql/parquet/ParquetTableSupport.scala |  18 +-
 .../apache/spark/sql/parquet/newParquet.scala   |  14 +-
 .../spark/sql/sources/DataSourceStrategy.scala  | 111 +++++-----
 .../spark/sql/sources/PartitioningUtils.scala   |   7 +-
 .../org/apache/spark/sql/sources/commands.scala |  74 +++----
 .../org/apache/spark/sql/sources/ddl.scala      |  10 +-
 .../apache/spark/sql/sources/interfaces.scala   |  19 +-
 .../org/apache/spark/sql/CachedTableSuite.scala |   2 +-
 .../spark/sql/columnar/ColumnStatsSuite.scala   |  27 ++-
 .../spark/sql/columnar/ColumnarTestUtils.scala  |   9 +-
 .../columnar/InMemoryColumnarQuerySuite.scala   |   3 +-
 .../compression/BooleanBitSetSuite.scala        |   6 +-
 .../spark/sql/execution/PlannerSuite.scala      |   5 +-
 .../execution/joins/HashedRelationSuite.scala   |  26 +--
 .../spark/sql/parquet/ParquetFilterSuite.scala  |   2 +-
 .../spark/sql/parquet/ParquetIOSuite.scala      |   3 +-
 .../ParquetPartitionDiscoverySuite.scala        |  57 +++--
 .../spark/sql/parquet/ParquetQuerySuite.scala   |   4 +-
 .../apache/spark/sql/sources/DDLTestSuite.scala |   7 +-
 .../spark/sql/sources/TableScanSuite.scala      |  27 ++-
 .../apache/spark/sql/hive/HiveInspectors.scala  |   8 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |   2 +-
 .../apache/spark/sql/hive/HiveStrategies.scala  |   6 +-
 .../org/apache/spark/sql/hive/TableReader.scala |  18 +-
 .../hive/execution/CreateTableAsSelect.scala    |   6 +-
 .../execution/DescribeHiveTableCommand.scala    |  12 +-
 .../sql/hive/execution/HiveNativeCommand.scala  |  68 +++---
 .../sql/hive/execution/HiveTableScan.scala      |   2 +-
 .../hive/execution/InsertIntoHiveTable.scala    |  16 +-
 .../hive/execution/ScriptTransformation.scala   |  10 +-
 .../spark/sql/hive/execution/commands.scala     |  32 +--
 .../org/apache/spark/sql/hive/hiveUdfs.scala    |  24 +-
 .../spark/sql/hive/HiveInspectorSuite.scala     |   9 +-
 .../spark/sql/hive/HiveParquetSuite.scala       |   3 +-
 .../hive/orc/OrcPartitionDiscoverySuite.scala   |   2 +-
 .../spark/sql/hive/orc/OrcQuerySuite.scala      |   2 +-
 .../apache/spark/sql/hive/parquetSuites.scala   |   4 +-
 134 files changed, 1443 insertions(+), 1256 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/java/org/apache/spark/sql/BaseMutableRow.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/BaseMutableRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/BaseMutableRow.java
new file mode 100644
index 0000000..acec2bf
--- /dev/null
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/BaseMutableRow.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql;
+
+import org.apache.spark.sql.catalyst.expressions.MutableRow;
+
+public abstract class BaseMutableRow extends BaseRow implements MutableRow {
+
+  @Override
+  public void update(int ordinal, Object value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setInt(int ordinal, int value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setLong(int ordinal, long value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setDouble(int ordinal, double value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setBoolean(int ordinal, boolean value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setShort(int ordinal, short value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setByte(int ordinal, byte value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setFloat(int ordinal, float value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setString(int ordinal, String value) {
+    throw new UnsupportedOperationException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/java/org/apache/spark/sql/BaseRow.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/BaseRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/BaseRow.java
new file mode 100644
index 0000000..611e02d
--- /dev/null
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/BaseRow.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.List;
+
+import scala.collection.Seq;
+import scala.collection.mutable.ArraySeq;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericRow;
+import org.apache.spark.sql.types.StructType;
+
+public abstract class BaseRow extends InternalRow {
+
+  @Override
+  final public int length() {
+    return size();
+  }
+
+  @Override
+  public boolean anyNull() {
+    final int n = size();
+    for (int i=0; i < n; i++) {
+      if (isNullAt(i)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public StructType schema() { throw new UnsupportedOperationException(); }
+
+  @Override
+  final public Object apply(int i) {
+    return get(i);
+  }
+
+  @Override
+  public int getInt(int i) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getLong(int i) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public float getFloat(int i) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public double getDouble(int i) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public byte getByte(int i) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public short getShort(int i) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean getBoolean(int i) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public String getString(int i) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public BigDecimal getDecimal(int i) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Date getDate(int i) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Timestamp getTimestamp(int i) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public <T> Seq<T> getSeq(int i) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public <T> List<T> getList(int i) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public <K, V> scala.collection.Map<K, V> getMap(int i) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public <T> scala.collection.immutable.Map<String, T> getValuesMap(Seq<String> fieldNames) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public <K, V> java.util.Map<K, V> getJavaMap(int i) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Row getStruct(int i) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public <T> T getAs(int i) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public <T> T getAs(String fieldName) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int fieldIndex(String name) {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * A generic version of Row.equals(Row), which is used for tests.
+   */
+  @Override
+  public boolean equals(Object other) {
+    if (other instanceof Row) {
+      Row row = (Row) other;
+      int n = size();
+      if (n != row.size()) {
+        return false;
+      }
+      for (int i = 0; i < n; i ++) {
+        if (isNullAt(i) != row.isNullAt(i) || (!isNullAt(i) && !get(i).equals(row.get(i)))) {
+          return false;
+        }
+      }
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public InternalRow copy() {
+    final int n = size();
+    Object[] arr = new Object[n];
+    for (int i = 0; i < n; i++) {
+      arr[i] = get(i);
+    }
+    return new GenericRow(arr);
+  }
+
+  @Override
+  public Seq<Object> toSeq() {
+    final int n = size();
+    final ArraySeq<Object> values = new ArraySeq<Object>(n);
+    for (int i = 0; i < n; i++) {
+      values.update(i, get(i));
+    }
+    return values;
+  }
+
+  @Override
+  public String toString() {
+    return mkString("[", ",", "]");
+  }
+
+  @Override
+  public String mkString() {
+    return toSeq().mkString();
+  }
+
+  @Override
+  public String mkString(String sep) {
+    return toSeq().mkString(sep);
+  }
+
+  @Override
+  public String mkString(String start, String sep, String end) {
+    return toSeq().mkString(start, sep, end);
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java
index 299ff37..b23e0ef 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.expressions;
 import java.util.Arrays;
 import java.util.Iterator;
 
-import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.unsafe.PlatformDependent;
@@ -107,7 +107,7 @@ public final class UnsafeFixedWidthAggregationMap {
    * @param enablePerfMetrics if true, performance metrics will be recorded (has minor perf impact)
    */
   public UnsafeFixedWidthAggregationMap(
-      Row emptyAggregationBuffer,
+      InternalRow emptyAggregationBuffer,
       StructType aggregationBufferSchema,
       StructType groupingKeySchema,
       TaskMemoryManager memoryManager,
@@ -125,7 +125,7 @@ public final class UnsafeFixedWidthAggregationMap {
   /**
    * Convert a Java object row into an UnsafeRow, allocating it into a new long array.
    */
-  private static long[] convertToUnsafeRow(Row javaRow, StructType schema) {
+  private static long[] convertToUnsafeRow(InternalRow javaRow, StructType schema) {
     final UnsafeRowConverter converter = new UnsafeRowConverter(schema);
     final long[] unsafeRow = new long[converter.getSizeRequirement(javaRow)];
     final long writtenLength =
@@ -138,7 +138,7 @@ public final class UnsafeFixedWidthAggregationMap {
    * Return the aggregation buffer for the current group. For efficiency, all calls to this method
    * return the same object.
    */
-  public UnsafeRow getAggregationBuffer(Row groupingKey) {
+  public UnsafeRow getAggregationBuffer(InternalRow groupingKey) {
     final int groupingKeySize = groupingKeyToUnsafeRowConverter.getSizeRequirement(groupingKey);
     // Make sure that the buffer is large enough to hold the key. If it's not, grow it:
     if (groupingKeySize > groupingKeyConversionScratchSpace.length) {

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index 143acc9..aec88c9 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -26,7 +26,7 @@ import java.util.Set;
 import scala.collection.Seq;
 import scala.collection.mutable.ArraySeq;
 
-import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.BaseMutableRow;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.StructType;
@@ -334,7 +334,7 @@ public final class UnsafeRow extends BaseMutableRow {
 
 
   @Override
-  public Row copy() {
+  public InternalRow copy() {
     throw new UnsupportedOperationException();
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/BaseMutableRow.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/BaseMutableRow.java b/sql/catalyst/src/main/scala/org/apache/spark/sql/BaseMutableRow.java
deleted file mode 100644
index acec2bf..0000000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/BaseMutableRow.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql;
-
-import org.apache.spark.sql.catalyst.expressions.MutableRow;
-
-public abstract class BaseMutableRow extends BaseRow implements MutableRow {
-
-  @Override
-  public void update(int ordinal, Object value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void setInt(int ordinal, int value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void setLong(int ordinal, long value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void setDouble(int ordinal, double value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void setBoolean(int ordinal, boolean value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void setShort(int ordinal, short value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void setByte(int ordinal, byte value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void setFloat(int ordinal, float value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void setString(int ordinal, String value) {
-    throw new UnsupportedOperationException();
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/BaseRow.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/BaseRow.java b/sql/catalyst/src/main/scala/org/apache/spark/sql/BaseRow.java
deleted file mode 100644
index e91daf1..0000000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/BaseRow.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql;
-
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Timestamp;
-import java.util.List;
-
-import scala.collection.Seq;
-import scala.collection.mutable.ArraySeq;
-
-import org.apache.spark.sql.catalyst.expressions.GenericRow;
-import org.apache.spark.sql.types.StructType;
-
-public abstract class BaseRow implements Row {
-
-  @Override
-  final public int length() {
-    return size();
-  }
-
-  @Override
-  public boolean anyNull() {
-    final int n = size();
-    for (int i=0; i < n; i++) {
-      if (isNullAt(i)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  @Override
-  public StructType schema() { throw new UnsupportedOperationException(); }
-
-  @Override
-  final public Object apply(int i) {
-    return get(i);
-  }
-
-  @Override
-  public int getInt(int i) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public long getLong(int i) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public float getFloat(int i) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public double getDouble(int i) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public byte getByte(int i) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public short getShort(int i) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public boolean getBoolean(int i) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public String getString(int i) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public BigDecimal getDecimal(int i) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Date getDate(int i) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Timestamp getTimestamp(int i) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public <T> Seq<T> getSeq(int i) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public <T> List<T> getList(int i) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public <K, V> scala.collection.Map<K, V> getMap(int i) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public <T> scala.collection.immutable.Map<String, T> getValuesMap(Seq<String> fieldNames) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public <K, V> java.util.Map<K, V> getJavaMap(int i) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Row getStruct(int i) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public <T> T getAs(int i) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public <T> T getAs(String fieldName) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public int fieldIndex(String name) {
-    throw new UnsupportedOperationException();
-  }
-
-  /**
-   * A generic version of Row.equals(Row), which is used for tests.
-   */
-  @Override
-  public boolean equals(Object other) {
-    if (other instanceof Row) {
-      Row row = (Row) other;
-      int n = size();
-      if (n != row.size()) {
-        return false;
-      }
-      for (int i = 0; i < n; i ++) {
-        if (isNullAt(i) != row.isNullAt(i) || (!isNullAt(i) && !get(i).equals(row.get(i)))) {
-          return false;
-        }
-      }
-      return true;
-    }
-    return false;
-  }
-
-  @Override
-  public Row copy() {
-    final int n = size();
-    Object[] arr = new Object[n];
-    for (int i = 0; i < n; i++) {
-      arr[i] = get(i);
-    }
-    return new GenericRow(arr);
-  }
-
-  @Override
-  public Seq<Object> toSeq() {
-    final int n = size();
-    final ArraySeq<Object> values = new ArraySeq<Object>(n);
-    for (int i = 0; i < n; i++) {
-      values.update(i, get(i));
-    }
-    return values;
-  }
-
-  @Override
-  public String toString() {
-    return mkString("[", ",", "]");
-  }
-
-  @Override
-  public String mkString() {
-    return toSeq().mkString();
-  }
-
-  @Override
-  public String mkString(String sep) {
-    return toSeq().mkString(sep);
-  }
-
-  @Override
-  public String mkString(String start, String sep, String end) {
-    return toSeq().mkString(start, sep, end);
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
index 7e4b11a..6175456 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
@@ -27,6 +27,7 @@ import scala.collection.mutable.HashMap
 
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.util.DateUtils
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -105,7 +106,7 @@ object CatalystTypeConverters {
     /**
      * Given a Catalyst row, convert the value at column `column` to its Scala equivalent.
      */
-    final def toScala(row: Row, column: Int): ScalaOutputType = {
+    final def toScala(row: InternalRow, column: Int): ScalaOutputType = {
       if (row.isNullAt(column)) null.asInstanceOf[ScalaOutputType] else toScalaImpl(row, column)
     }
 
@@ -125,20 +126,20 @@ object CatalystTypeConverters {
      * Given a Catalyst row, convert the value at column `column` to its Scala equivalent.
      * This method will only be called on non-null columns.
      */
-    protected def toScalaImpl(row: Row, column: Int): ScalaOutputType
+    protected def toScalaImpl(row: InternalRow, column: Int): ScalaOutputType
   }
 
   private object IdentityConverter extends CatalystTypeConverter[Any, Any, Any] {
     override def toCatalystImpl(scalaValue: Any): Any = scalaValue
     override def toScala(catalystValue: Any): Any = catalystValue
-    override def toScalaImpl(row: Row, column: Int): Any = row(column)
+    override def toScalaImpl(row: InternalRow, column: Int): Any = row(column)
   }
 
   private case class UDTConverter(
       udt: UserDefinedType[_]) extends CatalystTypeConverter[Any, Any, Any] {
     override def toCatalystImpl(scalaValue: Any): Any = udt.serialize(scalaValue)
     override def toScala(catalystValue: Any): Any = udt.deserialize(catalystValue)
-    override def toScalaImpl(row: Row, column: Int): Any = toScala(row(column))
+    override def toScalaImpl(row: InternalRow, column: Int): Any = toScala(row(column))
   }
 
   /** Converter for arrays, sequences, and Java iterables. */
@@ -170,7 +171,7 @@ object CatalystTypeConverters {
       }
     }
 
-    override def toScalaImpl(row: Row, column: Int): Seq[Any] =
+    override def toScalaImpl(row: InternalRow, column: Int): Seq[Any] =
       toScala(row(column).asInstanceOf[Seq[Any]])
   }
 
@@ -209,16 +210,16 @@ object CatalystTypeConverters {
       }
     }
 
-    override def toScalaImpl(row: Row, column: Int): Map[Any, Any] =
+    override def toScalaImpl(row: InternalRow, column: Int): Map[Any, Any] =
       toScala(row(column).asInstanceOf[Map[Any, Any]])
   }
 
   private case class StructConverter(
-      structType: StructType) extends CatalystTypeConverter[Any, Row, Row] {
+      structType: StructType) extends CatalystTypeConverter[Any, Row, InternalRow] {
 
     private[this] val converters = structType.fields.map { f => getConverterForType(f.dataType) }
 
-    override def toCatalystImpl(scalaValue: Any): Row = scalaValue match {
+    override def toCatalystImpl(scalaValue: Any): InternalRow = scalaValue match {
       case row: Row =>
         val ar = new Array[Any](row.size)
         var idx = 0
@@ -239,7 +240,7 @@ object CatalystTypeConverters {
         new GenericRowWithSchema(ar, structType)
     }
 
-    override def toScala(row: Row): Row = {
+    override def toScala(row: InternalRow): Row = {
       if (row == null) {
         null
       } else {
@@ -253,7 +254,8 @@ object CatalystTypeConverters {
       }
     }
 
-    override def toScalaImpl(row: Row, column: Int): Row = toScala(row(column).asInstanceOf[Row])
+    override def toScalaImpl(row: InternalRow, column: Int): Row =
+      toScala(row(column).asInstanceOf[InternalRow])
   }
 
   private object StringConverter extends CatalystTypeConverter[Any, String, Any] {
@@ -266,14 +268,14 @@ object CatalystTypeConverters {
       case str: String => str
       case utf8: UTF8String => utf8.toString()
     }
-    override def toScalaImpl(row: Row, column: Int): String = row(column).toString
+    override def toScalaImpl(row: InternalRow, column: Int): String = row(column).toString
   }
 
   private object DateConverter extends CatalystTypeConverter[Date, Date, Any] {
     override def toCatalystImpl(scalaValue: Date): Int = DateUtils.fromJavaDate(scalaValue)
     override def toScala(catalystValue: Any): Date =
       if (catalystValue == null) null else DateUtils.toJavaDate(catalystValue.asInstanceOf[Int])
-    override def toScalaImpl(row: Row, column: Int): Date = toScala(row.getInt(column))
+    override def toScalaImpl(row: InternalRow, column: Int): Date = toScala(row.getInt(column))
   }
 
   private object TimestampConverter extends CatalystTypeConverter[Timestamp, Timestamp, Any] {
@@ -282,7 +284,8 @@ object CatalystTypeConverters {
     override def toScala(catalystValue: Any): Timestamp =
       if (catalystValue == null) null
       else DateUtils.toJavaTimestamp(catalystValue.asInstanceOf[Long])
-    override def toScalaImpl(row: Row, column: Int): Timestamp = toScala(row.getLong(column))
+    override def toScalaImpl(row: InternalRow, column: Int): Timestamp =
+      toScala(row.getLong(column))
   }
 
   private object BigDecimalConverter extends CatalystTypeConverter[Any, JavaBigDecimal, Decimal] {
@@ -292,10 +295,11 @@ object CatalystTypeConverters {
       case d: Decimal => d
     }
     override def toScala(catalystValue: Decimal): JavaBigDecimal = catalystValue.toJavaBigDecimal
-    override def toScalaImpl(row: Row, column: Int): JavaBigDecimal = row.get(column) match {
-      case d: JavaBigDecimal => d
-      case d: Decimal => d.toJavaBigDecimal
-    }
+    override def toScalaImpl(row: InternalRow, column: Int): JavaBigDecimal =
+      row.get(column) match {
+        case d: JavaBigDecimal => d
+        case d: Decimal => d.toJavaBigDecimal
+      }
   }
 
   private abstract class PrimitiveConverter[T] extends CatalystTypeConverter[T, Any, Any] {
@@ -304,31 +308,31 @@ object CatalystTypeConverters {
   }
 
   private object BooleanConverter extends PrimitiveConverter[Boolean] {
-    override def toScalaImpl(row: Row, column: Int): Boolean = row.getBoolean(column)
+    override def toScalaImpl(row: InternalRow, column: Int): Boolean = row.getBoolean(column)
   }
 
   private object ByteConverter extends PrimitiveConverter[Byte] {
-    override def toScalaImpl(row: Row, column: Int): Byte = row.getByte(column)
+    override def toScalaImpl(row: InternalRow, column: Int): Byte = row.getByte(column)
   }
 
   private object ShortConverter extends PrimitiveConverter[Short] {
-    override def toScalaImpl(row: Row, column: Int): Short = row.getShort(column)
+    override def toScalaImpl(row: InternalRow, column: Int): Short = row.getShort(column)
   }
 
   private object IntConverter extends PrimitiveConverter[Int] {
-    override def toScalaImpl(row: Row, column: Int): Int = row.getInt(column)
+    override def toScalaImpl(row: InternalRow, column: Int): Int = row.getInt(column)
   }
 
   private object LongConverter extends PrimitiveConverter[Long] {
-    override def toScalaImpl(row: Row, column: Int): Long = row.getLong(column)
+    override def toScalaImpl(row: InternalRow, column: Int): Long = row.getLong(column)
   }
 
   private object FloatConverter extends PrimitiveConverter[Float] {
-    override def toScalaImpl(row: Row, column: Int): Float = row.getFloat(column)
+    override def toScalaImpl(row: InternalRow, column: Int): Float = row.getFloat(column)
   }
 
   private object DoubleConverter extends PrimitiveConverter[Double] {
-    override def toScalaImpl(row: Row, column: Int): Double = row.getDouble(column)
+    override def toScalaImpl(row: InternalRow, column: Int): Double = row.getDouble(column)
   }
 
   /**
@@ -382,7 +386,7 @@ object CatalystTypeConverters {
     case d: BigDecimal => BigDecimalConverter.toCatalyst(d)
     case d: JavaBigDecimal => BigDecimalConverter.toCatalyst(d)
     case seq: Seq[Any] => seq.map(convertToCatalyst)
-    case r: Row => Row(r.toSeq.map(convertToCatalyst): _*)
+    case r: Row => InternalRow(r.toSeq.map(convertToCatalyst): _*)
     case arr: Array[Any] => arr.toSeq.map(convertToCatalyst).toArray
     case m: Map[Any, Any] =>
       m.map { case (k, v) => (convertToCatalyst(k), convertToCatalyst(v)) }.toMap

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala
new file mode 100644
index 0000000..e3c2cc2
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.GenericRow
+
+/**
+ * An abstract class for row used internal in Spark SQL, which only contain the columns as
+ * internal types.
+ */
+abstract class InternalRow extends Row {
+  // A default implementation to change the return type
+  override def copy(): InternalRow = {this}
+}
+
+object InternalRow {
+  def unapplySeq(row: InternalRow): Some[Seq[Any]] = Some(row.toSeq)
+
+  /**
+   * This method can be used to construct a [[Row]] with the given values.
+   */
+  def apply(values: Any*): InternalRow = new GenericRow(values.toArray)
+
+  /**
+   * This method can be used to construct a [[Row]] from a [[Seq]] of values.
+   */
+  def fromSeq(values: Seq[Any]): InternalRow = new GenericRow(values.toArray)
+
+  def fromTuple(tuple: Product): InternalRow = fromSeq(tuple.productIterator.toSeq)
+
+  /**
+   * Merge multiple rows into a single row, one after another.
+   */
+  def merge(rows: InternalRow*): InternalRow = {
+    // TODO: Improve the performance of this if used in performance critical part.
+    new GenericRow(rows.flatMap(_.toSeq).toArray)
+  }
+
+  /** Returns an empty row. */
+  val empty = apply()
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index bbb150c..5de188d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import org.apache.spark.sql.catalyst
 import org.apache.spark.sql.catalyst.{errors, trees}
 import org.apache.spark.sql.catalyst.errors.TreeNodeException
 import org.apache.spark.sql.catalyst.expressions._
@@ -67,7 +68,7 @@ case class UnresolvedAttribute(nameParts: Seq[String])
   override def withName(newName: String): UnresolvedAttribute = UnresolvedAttribute.quoted(newName)
 
   // Unresolved attributes are transient at compile time and don't get evaluated during execution.
-  override def eval(input: Row = null): Any =
+  override def eval(input: catalyst.InternalRow = null): Any =
     throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
 
   override def toString: String = s"'$name"
@@ -85,7 +86,7 @@ case class UnresolvedFunction(name: String, children: Seq[Expression]) extends E
   override lazy val resolved = false
 
   // Unresolved functions are transient at compile time and don't get evaluated during execution.
-  override def eval(input: Row = null): Any =
+  override def eval(input: catalyst.InternalRow = null): Any =
     throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
 
   override def toString: String = s"'$name(${children.mkString(",")})"
@@ -107,7 +108,7 @@ trait Star extends NamedExpression with trees.LeafNode[Expression] {
   override lazy val resolved = false
 
   // Star gets expanded at runtime so we never evaluate a Star.
-  override def eval(input: Row = null): Any =
+  override def eval(input: catalyst.InternalRow = null): Any =
     throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
 
   def expand(input: Seq[Attribute], resolver: Resolver): Seq[NamedExpression]
@@ -166,7 +167,7 @@ case class MultiAlias(child: Expression, names: Seq[String])
 
   override lazy val resolved = false
 
-  override def eval(input: Row = null): Any =
+  override def eval(input: catalyst.InternalRow = null): Any =
     throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
 
   override def toString: String = s"$child AS $names"
@@ -200,7 +201,7 @@ case class UnresolvedExtractValue(child: Expression, extraction: Expression)
   override def nullable: Boolean = throw new UnresolvedException(this, "nullable")
   override lazy val resolved = false
 
-  override def eval(input: Row = null): Any =
+  override def eval(input: catalyst.InternalRow = null): Any =
     throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
 
   override def toString: String = s"$child[$extraction]"

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
index fcadf95..c4dd11a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
@@ -21,7 +21,7 @@ import org.apache.spark.Logging
 import org.apache.spark.sql.catalyst.errors.attachTree
 import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext}
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.catalyst.trees
+import org.apache.spark.sql.catalyst.{InternalRow, trees}
 
 /**
  * A bound reference points to a specific slot in the input tuple, allowing the actual value
@@ -33,7 +33,7 @@ case class BoundReference(ordinal: Int, dataType: DataType, nullable: Boolean)
 
   override def toString: String = s"input[$ordinal]"
 
-  override def eval(input: Row): Any = input(ordinal)
+  override def eval(input: InternalRow): Any = input(ordinal)
 
   override def name: String = s"i[$ordinal]"
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
index 4c7123f..afbf30a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
@@ -21,6 +21,7 @@ import java.sql.{Date, Timestamp}
 import java.text.{DateFormat, SimpleDateFormat}
 
 import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
 import org.apache.spark.sql.catalyst.util.DateUtils
 import org.apache.spark.sql.types._
@@ -393,7 +394,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
     }
     // TODO: Could be faster?
     val newRow = new GenericMutableRow(from.fields.size)
-    buildCast[Row](_, row => {
+    buildCast[catalyst.InternalRow](_, row => {
       var i = 0
       while (i < row.length) {
         val v = row(i)
@@ -425,7 +426,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
 
   private[this] lazy val cast: Any => Any = cast(child.dataType, dataType)
 
-  override def eval(input: Row): Any = {
+  override def eval(input: catalyst.InternalRow): Any = {
     val evaluated = child.eval(input)
     if (evaluated == null) null else cast(evaluated)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
index 0b9f621..61de34b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.catalyst.expressions
 
 import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, UnresolvedAttribute}
-import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext}
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
 import org.apache.spark.sql.catalyst.trees
 import org.apache.spark.sql.catalyst.trees.TreeNode
 import org.apache.spark.sql.types._
@@ -59,7 +59,7 @@ abstract class Expression extends TreeNode[Expression] {
   def references: AttributeSet = AttributeSet(children.flatMap(_.references.iterator))
 
   /** Returns the result of evaluating this expression on a given input Row */
-  def eval(input: Row = null): Any
+  def eval(input: InternalRow = null): Any
 
   /**
    * Returns an [[GeneratedExpressionCode]], which contains Java source code that

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExtractValue.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExtractValue.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExtractValue.scala
index a1e0819..16f3ccc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExtractValue.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExtractValue.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.expressions
 
 import scala.collection.Map
 
-import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.{catalyst, AnalysisException}
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.types._
 
@@ -105,8 +105,8 @@ case class GetStructField(child: Expression, field: StructField, ordinal: Int)
   override def foldable: Boolean = child.foldable
   override def toString: String = s"$child.${field.name}"
 
-  override def eval(input: Row): Any = {
-    val baseValue = child.eval(input).asInstanceOf[Row]
+  override def eval(input: catalyst.InternalRow): Any = {
+    val baseValue = child.eval(input).asInstanceOf[catalyst.InternalRow]
     if (baseValue == null) null else baseValue(ordinal)
   }
 }
@@ -125,8 +125,8 @@ case class GetArrayStructFields(
   override def foldable: Boolean = child.foldable
   override def toString: String = s"$child.${field.name}"
 
-  override def eval(input: Row): Any = {
-    val baseValue = child.eval(input).asInstanceOf[Seq[Row]]
+  override def eval(input: catalyst.InternalRow): Any = {
+    val baseValue = child.eval(input).asInstanceOf[Seq[catalyst.InternalRow]]
     if (baseValue == null) null else {
       baseValue.map { row =>
         if (row == null) null else row(ordinal)
@@ -146,7 +146,7 @@ abstract class ExtractValueWithOrdinal extends ExtractValue {
   override def toString: String = s"$child[$ordinal]"
   override def children: Seq[Expression] = child :: ordinal :: Nil
 
-  override def eval(input: Row): Any = {
+  override def eval(input: catalyst.InternalRow): Any = {
     val value = child.eval(input)
     if (value == null) {
       null

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
index 8cae548..d6806f7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
+import org.apache.spark.sql.catalyst
+
 
 /**
  * A [[Projection]] that is calculated by calling the `eval` of each of the specified expressions.
@@ -30,7 +32,7 @@ class InterpretedProjection(expressions: Seq[Expression]) extends Projection {
   // null check is required for when Kryo invokes the no-arg constructor.
   protected val exprArray = if (expressions != null) expressions.toArray else null
 
-  def apply(input: Row): Row = {
+  def apply(input: catalyst.InternalRow): catalyst.InternalRow = {
     val outputArray = new Array[Any](exprArray.length)
     var i = 0
     while (i < exprArray.length) {
@@ -55,14 +57,14 @@ case class InterpretedMutableProjection(expressions: Seq[Expression]) extends Mu
 
   private[this] val exprArray = expressions.toArray
   private[this] var mutableRow: MutableRow = new GenericMutableRow(exprArray.size)
-  def currentValue: Row = mutableRow
+  def currentValue: catalyst.InternalRow = mutableRow
 
   override def target(row: MutableRow): MutableProjection = {
     mutableRow = row
     this
   }
 
-  override def apply(input: Row): Row = {
+  override def apply(input: catalyst.InternalRow): catalyst.InternalRow = {
     var i = 0
     while (i < exprArray.length) {
       mutableRow(i) = exprArray(i).eval(input)
@@ -76,31 +78,31 @@ case class InterpretedMutableProjection(expressions: Seq[Expression]) extends Mu
  * A mutable wrapper that makes two rows appear as a single concatenated row.  Designed to
  * be instantiated once per thread and reused.
  */
-class JoinedRow extends Row {
-  private[this] var row1: Row = _
-  private[this] var row2: Row = _
+class JoinedRow extends catalyst.InternalRow {
+  private[this] var row1: catalyst.InternalRow = _
+  private[this] var row2: catalyst.InternalRow = _
 
-  def this(left: Row, right: Row) = {
+  def this(left: catalyst.InternalRow, right: catalyst.InternalRow) = {
     this()
     row1 = left
     row2 = right
   }
 
   /** Updates this JoinedRow to used point at two new base rows.  Returns itself. */
-  def apply(r1: Row, r2: Row): Row = {
+  def apply(r1: catalyst.InternalRow, r2: catalyst.InternalRow): catalyst.InternalRow = {
     row1 = r1
     row2 = r2
     this
   }
 
   /** Updates this JoinedRow by updating its left base row.  Returns itself. */
-  def withLeft(newLeft: Row): Row = {
+  def withLeft(newLeft: catalyst.InternalRow): catalyst.InternalRow = {
     row1 = newLeft
     this
   }
 
   /** Updates this JoinedRow by updating its right base row.  Returns itself. */
-  def withRight(newRight: Row): Row = {
+  def withRight(newRight: catalyst.InternalRow): catalyst.InternalRow = {
     row2 = newRight
     this
   }
@@ -142,7 +144,7 @@ class JoinedRow extends Row {
   override def getAs[T](i: Int): T =
     if (i < row1.length) row1.getAs[T](i) else row2.getAs[T](i - row1.length)
 
-  override def copy(): Row = {
+  override def copy(): catalyst.InternalRow = {
     val totalSize = row1.length + row2.length
     val copiedValues = new Array[Any](totalSize)
     var i = 0
@@ -176,31 +178,31 @@ class JoinedRow extends Row {
  * Row will be referenced, increasing the opportunity for the JIT to play tricks.  This sounds
  * crazy but in benchmarks it had noticeable effects.
  */
-class JoinedRow2 extends Row {
-  private[this] var row1: Row = _
-  private[this] var row2: Row = _
+class JoinedRow2 extends catalyst.InternalRow {
+  private[this] var row1: catalyst.InternalRow = _
+  private[this] var row2: catalyst.InternalRow = _
 
-  def this(left: Row, right: Row) = {
+  def this(left: catalyst.InternalRow, right: catalyst.InternalRow) = {
     this()
     row1 = left
     row2 = right
   }
 
   /** Updates this JoinedRow to used point at two new base rows.  Returns itself. */
-  def apply(r1: Row, r2: Row): Row = {
+  def apply(r1: catalyst.InternalRow, r2: catalyst.InternalRow): catalyst.InternalRow = {
     row1 = r1
     row2 = r2
     this
   }
 
   /** Updates this JoinedRow by updating its left base row.  Returns itself. */
-  def withLeft(newLeft: Row): Row = {
+  def withLeft(newLeft: catalyst.InternalRow): catalyst.InternalRow = {
     row1 = newLeft
     this
   }
 
   /** Updates this JoinedRow by updating its right base row.  Returns itself. */
-  def withRight(newRight: Row): Row = {
+  def withRight(newRight: catalyst.InternalRow): catalyst.InternalRow = {
     row2 = newRight
     this
   }
@@ -242,7 +244,7 @@ class JoinedRow2 extends Row {
   override def getAs[T](i: Int): T =
     if (i < row1.length) row1.getAs[T](i) else row2.getAs[T](i - row1.length)
 
-  override def copy(): Row = {
+  override def copy(): catalyst.InternalRow = {
     val totalSize = row1.length + row2.length
     val copiedValues = new Array[Any](totalSize)
     var i = 0
@@ -270,31 +272,31 @@ class JoinedRow2 extends Row {
 /**
  * JIT HACK: Replace with macros
  */
-class JoinedRow3 extends Row {
-  private[this] var row1: Row = _
-  private[this] var row2: Row = _
+class JoinedRow3 extends catalyst.InternalRow {
+  private[this] var row1: catalyst.InternalRow = _
+  private[this] var row2: catalyst.InternalRow = _
 
-  def this(left: Row, right: Row) = {
+  def this(left: catalyst.InternalRow, right: catalyst.InternalRow) = {
     this()
     row1 = left
     row2 = right
   }
 
   /** Updates this JoinedRow to used point at two new base rows.  Returns itself. */
-  def apply(r1: Row, r2: Row): Row = {
+  def apply(r1: catalyst.InternalRow, r2: catalyst.InternalRow): catalyst.InternalRow = {
     row1 = r1
     row2 = r2
     this
   }
 
   /** Updates this JoinedRow by updating its left base row.  Returns itself. */
-  def withLeft(newLeft: Row): Row = {
+  def withLeft(newLeft: catalyst.InternalRow): catalyst.InternalRow = {
     row1 = newLeft
     this
   }
 
   /** Updates this JoinedRow by updating its right base row.  Returns itself. */
-  def withRight(newRight: Row): Row = {
+  def withRight(newRight: catalyst.InternalRow): catalyst.InternalRow = {
     row2 = newRight
     this
   }
@@ -336,7 +338,7 @@ class JoinedRow3 extends Row {
   override def getAs[T](i: Int): T =
     if (i < row1.length) row1.getAs[T](i) else row2.getAs[T](i - row1.length)
 
-  override def copy(): Row = {
+  override def copy(): catalyst.InternalRow = {
     val totalSize = row1.length + row2.length
     val copiedValues = new Array[Any](totalSize)
     var i = 0
@@ -364,31 +366,31 @@ class JoinedRow3 extends Row {
 /**
  * JIT HACK: Replace with macros
  */
-class JoinedRow4 extends Row {
-  private[this] var row1: Row = _
-  private[this] var row2: Row = _
+class JoinedRow4 extends catalyst.InternalRow {
+  private[this] var row1: catalyst.InternalRow = _
+  private[this] var row2: catalyst.InternalRow = _
 
-  def this(left: Row, right: Row) = {
+  def this(left: catalyst.InternalRow, right: catalyst.InternalRow) = {
     this()
     row1 = left
     row2 = right
   }
 
   /** Updates this JoinedRow to used point at two new base rows.  Returns itself. */
-  def apply(r1: Row, r2: Row): Row = {
+  def apply(r1: catalyst.InternalRow, r2: catalyst.InternalRow): catalyst.InternalRow = {
     row1 = r1
     row2 = r2
     this
   }
 
   /** Updates this JoinedRow by updating its left base row.  Returns itself. */
-  def withLeft(newLeft: Row): Row = {
+  def withLeft(newLeft: catalyst.InternalRow): catalyst.InternalRow = {
     row1 = newLeft
     this
   }
 
   /** Updates this JoinedRow by updating its right base row.  Returns itself. */
-  def withRight(newRight: Row): Row = {
+  def withRight(newRight: catalyst.InternalRow): catalyst.InternalRow = {
     row2 = newRight
     this
   }
@@ -430,7 +432,7 @@ class JoinedRow4 extends Row {
   override def getAs[T](i: Int): T =
     if (i < row1.length) row1.getAs[T](i) else row2.getAs[T](i - row1.length)
 
-  override def copy(): Row = {
+  override def copy(): catalyst.InternalRow = {
     val totalSize = row1.length + row2.length
     val copiedValues = new Array[Any](totalSize)
     var i = 0
@@ -458,31 +460,31 @@ class JoinedRow4 extends Row {
 /**
  * JIT HACK: Replace with macros
  */
-class JoinedRow5 extends Row {
-  private[this] var row1: Row = _
-  private[this] var row2: Row = _
+class JoinedRow5 extends catalyst.InternalRow {
+  private[this] var row1: catalyst.InternalRow = _
+  private[this] var row2: catalyst.InternalRow = _
 
-  def this(left: Row, right: Row) = {
+  def this(left: catalyst.InternalRow, right: catalyst.InternalRow) = {
     this()
     row1 = left
     row2 = right
   }
 
   /** Updates this JoinedRow to used point at two new base rows.  Returns itself. */
-  def apply(r1: Row, r2: Row): Row = {
+  def apply(r1: catalyst.InternalRow, r2: catalyst.InternalRow): catalyst.InternalRow = {
     row1 = r1
     row2 = r2
     this
   }
 
   /** Updates this JoinedRow by updating its left base row.  Returns itself. */
-  def withLeft(newLeft: Row): Row = {
+  def withLeft(newLeft: catalyst.InternalRow): catalyst.InternalRow = {
     row1 = newLeft
     this
   }
 
   /** Updates this JoinedRow by updating its right base row.  Returns itself. */
-  def withRight(newRight: Row): Row = {
+  def withRight(newRight: catalyst.InternalRow): catalyst.InternalRow = {
     row2 = newRight
     this
   }
@@ -524,7 +526,7 @@ class JoinedRow5 extends Row {
   override def getAs[T](i: Int): T =
     if (i < row1.length) row1.getAs[T](i) else row2.getAs[T](i - row1.length)
 
-  override def copy(): Row = {
+  override def copy(): catalyst.InternalRow = {
     val totalSize = row1.length + row2.length
     val copiedValues = new Array[Any](totalSize)
     var i = 0
@@ -552,31 +554,31 @@ class JoinedRow5 extends Row {
 /**
  * JIT HACK: Replace with macros
  */
-class JoinedRow6 extends Row {
-  private[this] var row1: Row = _
-  private[this] var row2: Row = _
+class JoinedRow6 extends catalyst.InternalRow {
+  private[this] var row1: catalyst.InternalRow = _
+  private[this] var row2: catalyst.InternalRow = _
 
-  def this(left: Row, right: Row) = {
+  def this(left: catalyst.InternalRow, right: catalyst.InternalRow) = {
     this()
     row1 = left
     row2 = right
   }
 
   /** Updates this JoinedRow to used point at two new base rows.  Returns itself. */
-  def apply(r1: Row, r2: Row): Row = {
+  def apply(r1: catalyst.InternalRow, r2: catalyst.InternalRow): catalyst.InternalRow = {
     row1 = r1
     row2 = r2
     this
   }
 
   /** Updates this JoinedRow by updating its left base row.  Returns itself. */
-  def withLeft(newLeft: Row): Row = {
+  def withLeft(newLeft: catalyst.InternalRow): catalyst.InternalRow = {
     row1 = newLeft
     this
   }
 
   /** Updates this JoinedRow by updating its right base row.  Returns itself. */
-  def withRight(newRight: Row): Row = {
+  def withRight(newRight: catalyst.InternalRow): catalyst.InternalRow = {
     row2 = newRight
     this
   }
@@ -618,7 +620,7 @@ class JoinedRow6 extends Row {
   override def getAs[T](i: Int): T =
     if (i < row1.length) row1.getAs[T](i) else row2.getAs[T](i - row1.length)
 
-  override def copy(): Row = {
+  override def copy(): catalyst.InternalRow = {
     val totalSize = row1.length + row2.length
     val copiedValues = new Array[Any](totalSize)
     var i = 0

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
index 5b45347..40f235f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
+import org.apache.spark.sql.catalyst
 import org.apache.spark.sql.catalyst.CatalystTypeConverters
 import org.apache.spark.sql.types.DataType
 
@@ -45,7 +46,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
       val func = function.asInstanceOf[($anys) => Any]
       $childs
       $converters
-      (input: Row) => {
+      (input: InternalRow) => {
         func(
           $evals)
       }
@@ -57,7 +58,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
   private[this] val f = children.size match {
     case 0 =>
       val func = function.asInstanceOf[() => Any]
-      (input: Row) => {
+      (input: catalyst.InternalRow) => {
         func()
       }
 
@@ -65,7 +66,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
       val func = function.asInstanceOf[(Any) => Any]
       val child0 = children(0)
       lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
-      (input: Row) => {
+      (input: catalyst.InternalRow) => {
         func(
           converter0(child0.eval(input)))
       }
@@ -76,7 +77,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
       val child1 = children(1)
       lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
       lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
-      (input: Row) => {
+      (input: catalyst.InternalRow) => {
         func(
           converter0(child0.eval(input)),
           converter1(child1.eval(input)))
@@ -90,7 +91,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
       lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
       lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
       lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
-      (input: Row) => {
+      (input: catalyst.InternalRow) => {
         func(
           converter0(child0.eval(input)),
           converter1(child1.eval(input)),
@@ -107,7 +108,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
       lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
       lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
       lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType)
-      (input: Row) => {
+      (input: catalyst.InternalRow) => {
         func(
           converter0(child0.eval(input)),
           converter1(child1.eval(input)),
@@ -127,7 +128,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
       lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
       lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType)
       lazy val converter4 = CatalystTypeConverters.createToScalaConverter(child4.dataType)
-      (input: Row) => {
+      (input: catalyst.InternalRow) => {
         func(
           converter0(child0.eval(input)),
           converter1(child1.eval(input)),
@@ -150,7 +151,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
       lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType)
       lazy val converter4 = CatalystTypeConverters.createToScalaConverter(child4.dataType)
       lazy val converter5 = CatalystTypeConverters.createToScalaConverter(child5.dataType)
-      (input: Row) => {
+      (input: catalyst.InternalRow) => {
         func(
           converter0(child0.eval(input)),
           converter1(child1.eval(input)),
@@ -176,7 +177,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
       lazy val converter4 = CatalystTypeConverters.createToScalaConverter(child4.dataType)
       lazy val converter5 = CatalystTypeConverters.createToScalaConverter(child5.dataType)
       lazy val converter6 = CatalystTypeConverters.createToScalaConverter(child6.dataType)
-      (input: Row) => {
+      (input: catalyst.InternalRow) => {
         func(
           converter0(child0.eval(input)),
           converter1(child1.eval(input)),
@@ -205,7 +206,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
       lazy val converter5 = CatalystTypeConverters.createToScalaConverter(child5.dataType)
       lazy val converter6 = CatalystTypeConverters.createToScalaConverter(child6.dataType)
       lazy val converter7 = CatalystTypeConverters.createToScalaConverter(child7.dataType)
-      (input: Row) => {
+      (input: catalyst.InternalRow) => {
         func(
           converter0(child0.eval(input)),
           converter1(child1.eval(input)),
@@ -237,7 +238,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
       lazy val converter6 = CatalystTypeConverters.createToScalaConverter(child6.dataType)
       lazy val converter7 = CatalystTypeConverters.createToScalaConverter(child7.dataType)
       lazy val converter8 = CatalystTypeConverters.createToScalaConverter(child8.dataType)
-      (input: Row) => {
+      (input: catalyst.InternalRow) => {
         func(
           converter0(child0.eval(input)),
           converter1(child1.eval(input)),
@@ -272,7 +273,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
       lazy val converter7 = CatalystTypeConverters.createToScalaConverter(child7.dataType)
       lazy val converter8 = CatalystTypeConverters.createToScalaConverter(child8.dataType)
       lazy val converter9 = CatalystTypeConverters.createToScalaConverter(child9.dataType)
-      (input: Row) => {
+      (input: catalyst.InternalRow) => {
         func(
           converter0(child0.eval(input)),
           converter1(child1.eval(input)),
@@ -310,7 +311,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
       lazy val converter8 = CatalystTypeConverters.createToScalaConverter(child8.dataType)
       lazy val converter9 = CatalystTypeConverters.createToScalaConverter(child9.dataType)
       lazy val converter10 = CatalystTypeConverters.createToScalaConverter(child10.dataType)
-      (input: Row) => {
+      (input: catalyst.InternalRow) => {
         func(
           converter0(child0.eval(input)),
           converter1(child1.eval(input)),
@@ -351,7 +352,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
       lazy val converter9 = CatalystTypeConverters.createToScalaConverter(child9.dataType)
       lazy val converter10 = CatalystTypeConverters.createToScalaConverter(child10.dataType)
       lazy val converter11 = CatalystTypeConverters.createToScalaConverter(child11.dataType)
-      (input: Row) => {
+      (input: catalyst.InternalRow) => {
         func(
           converter0(child0.eval(input)),
           converter1(child1.eval(input)),
@@ -395,7 +396,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
       lazy val converter10 = CatalystTypeConverters.createToScalaConverter(child10.dataType)
       lazy val converter11 = CatalystTypeConverters.createToScalaConverter(child11.dataType)
       lazy val converter12 = CatalystTypeConverters.createToScalaConverter(child12.dataType)
-      (input: Row) => {
+      (input: catalyst.InternalRow) => {
         func(
           converter0(child0.eval(input)),
           converter1(child1.eval(input)),
@@ -442,7 +443,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
       lazy val converter11 = CatalystTypeConverters.createToScalaConverter(child11.dataType)
       lazy val converter12 = CatalystTypeConverters.createToScalaConverter(child12.dataType)
       lazy val converter13 = CatalystTypeConverters.createToScalaConverter(child13.dataType)
-      (input: Row) => {
+      (input: catalyst.InternalRow) => {
         func(
           converter0(child0.eval(input)),
           converter1(child1.eval(input)),
@@ -492,7 +493,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
       lazy val converter12 = CatalystTypeConverters.createToScalaConverter(child12.dataType)
       lazy val converter13 = CatalystTypeConverters.createToScalaConverter(child13.dataType)
       lazy val converter14 = CatalystTypeConverters.createToScalaConverter(child14.dataType)
-      (input: Row) => {
+      (input: catalyst.InternalRow) => {
         func(
           converter0(child0.eval(input)),
           converter1(child1.eval(input)),
@@ -545,7 +546,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
       lazy val converter13 = CatalystTypeConverters.createToScalaConverter(child13.dataType)
       lazy val converter14 = CatalystTypeConverters.createToScalaConverter(child14.dataType)
       lazy val converter15 = CatalystTypeConverters.createToScalaConverter(child15.dataType)
-      (input: Row) => {
+      (input: catalyst.InternalRow) => {
         func(
           converter0(child0.eval(input)),
           converter1(child1.eval(input)),
@@ -601,7 +602,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
       lazy val converter14 = CatalystTypeConverters.createToScalaConverter(child14.dataType)
       lazy val converter15 = CatalystTypeConverters.createToScalaConverter(child15.dataType)
       lazy val converter16 = CatalystTypeConverters.createToScalaConverter(child16.dataType)
-      (input: Row) => {
+      (input: catalyst.InternalRow) => {
         func(
           converter0(child0.eval(input)),
           converter1(child1.eval(input)),
@@ -660,7 +661,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
       lazy val converter15 = CatalystTypeConverters.createToScalaConverter(child15.dataType)
       lazy val converter16 = CatalystTypeConverters.createToScalaConverter(child16.dataType)
       lazy val converter17 = CatalystTypeConverters.createToScalaConverter(child17.dataType)
-      (input: Row) => {
+      (input: catalyst.InternalRow) => {
         func(
           converter0(child0.eval(input)),
           converter1(child1.eval(input)),
@@ -722,7 +723,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
       lazy val converter16 = CatalystTypeConverters.createToScalaConverter(child16.dataType)
       lazy val converter17 = CatalystTypeConverters.createToScalaConverter(child17.dataType)
       lazy val converter18 = CatalystTypeConverters.createToScalaConverter(child18.dataType)
-      (input: Row) => {
+      (input: catalyst.InternalRow) => {
         func(
           converter0(child0.eval(input)),
           converter1(child1.eval(input)),
@@ -787,7 +788,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
       lazy val converter17 = CatalystTypeConverters.createToScalaConverter(child17.dataType)
       lazy val converter18 = CatalystTypeConverters.createToScalaConverter(child18.dataType)
       lazy val converter19 = CatalystTypeConverters.createToScalaConverter(child19.dataType)
-      (input: Row) => {
+      (input: catalyst.InternalRow) => {
         func(
           converter0(child0.eval(input)),
           converter1(child1.eval(input)),
@@ -855,7 +856,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
       lazy val converter18 = CatalystTypeConverters.createToScalaConverter(child18.dataType)
       lazy val converter19 = CatalystTypeConverters.createToScalaConverter(child19.dataType)
       lazy val converter20 = CatalystTypeConverters.createToScalaConverter(child20.dataType)
-      (input: Row) => {
+      (input: catalyst.InternalRow) => {
         func(
           converter0(child0.eval(input)),
           converter1(child1.eval(input)),
@@ -926,7 +927,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
       lazy val converter19 = CatalystTypeConverters.createToScalaConverter(child19.dataType)
       lazy val converter20 = CatalystTypeConverters.createToScalaConverter(child20.dataType)
       lazy val converter21 = CatalystTypeConverters.createToScalaConverter(child21.dataType)
-      (input: Row) => {
+      (input: catalyst.InternalRow) => {
         func(
           converter0(child0.eval(input)),
           converter1(child1.eval(input)),
@@ -955,6 +956,6 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
 
   // scalastyle:on
   private[this] val converter = CatalystTypeConverters.createToCatalystConverter(dataType)
-  override def eval(input: Row): Any = converter(f(input))
+  override def eval(input: catalyst.InternalRow): Any = converter(f(input))
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
index 99340a1..8a34355 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.catalyst.expressions
 
 import org.apache.spark.sql.catalyst.errors.TreeNodeException
-import org.apache.spark.sql.catalyst.trees
+import org.apache.spark.sql.catalyst.{InternalRow, trees}
 import org.apache.spark.sql.types.DataType
 
 abstract sealed class SortDirection
@@ -36,7 +36,7 @@ case class SortOrder(child: Expression, direction: SortDirection) extends Expres
   override def nullable: Boolean = child.nullable
 
   // SortOrder itself is never evaluated.
-  override def eval(input: Row = null): Any =
+  override def eval(input: InternalRow = null): Any =
     throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
 
   override def toString: String = s"$child ${if (direction == Ascending) "ASC" else "DESC"}"

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
index 98eda61..05aab34 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
@@ -222,7 +222,7 @@ final class SpecificMutableRow(val values: Array[MutableValue]) extends MutableR
 
   override def isNullAt(i: Int): Boolean = values(i).isNull
 
-  override def copy(): Row = {
+  override def copy(): InternalRow = {
     val newValues = new Array[Any](values.length)
     var i = 0
     while (i < values.length) {

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala
index 5350123..d771e45 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala
@@ -48,7 +48,7 @@ class UnsafeRowConverter(fieldTypes: Array[DataType]) {
   /**
    * Compute the amount of space, in bytes, required to encode the given row.
    */
-  def getSizeRequirement(row: Row): Int = {
+  def getSizeRequirement(row: InternalRow): Int = {
     var fieldNumber = 0
     var variableLengthFieldSize: Int = 0
     while (fieldNumber < writers.length) {
@@ -68,7 +68,7 @@ class UnsafeRowConverter(fieldTypes: Array[DataType]) {
    * @param baseOffset the base offset of the destination address
    * @return the number of bytes written. This should be equal to `getSizeRequirement(row)`.
    */
-  def writeRow(row: Row, baseObject: Object, baseOffset: Long): Long = {
+  def writeRow(row: InternalRow, baseObject: Object, baseOffset: Long): Long = {
     unsafeRow.pointTo(baseObject, baseOffset, writers.length, null)
     var fieldNumber = 0
     var appendCursor: Int = fixedLengthSize
@@ -99,12 +99,12 @@ private abstract class UnsafeColumnWriter {
    *                     used for calculating where variable-length data should be written
    * @return the number of variable-length bytes written
    */
-  def write(source: Row, target: UnsafeRow, column: Int, appendCursor: Int): Int
+  def write(source: InternalRow, target: UnsafeRow, column: Int, appendCursor: Int): Int
 
   /**
    * Return the number of bytes that are needed to write this variable-length value.
    */
-  def getSize(source: Row, column: Int): Int
+  def getSize(source: InternalRow, column: Int): Int
 }
 
 private object UnsafeColumnWriter {
@@ -140,72 +140,108 @@ private object StringUnsafeColumnWriter extends StringUnsafeColumnWriter
 
 private abstract class PrimitiveUnsafeColumnWriter extends UnsafeColumnWriter {
   // Primitives don't write to the variable-length region:
-  def getSize(sourceRow: Row, column: Int): Int = 0
+  def getSize(sourceRow: InternalRow, column: Int): Int = 0
 }
 
 private class NullUnsafeColumnWriter private() extends PrimitiveUnsafeColumnWriter {
-  override def write(source: Row, target: UnsafeRow, column: Int, appendCursor: Int): Int = {
+  override def write(
+      source: InternalRow,
+      target: UnsafeRow,
+      column: Int,
+      appendCursor: Int): Int = {
     target.setNullAt(column)
     0
   }
 }
 
 private class BooleanUnsafeColumnWriter private() extends PrimitiveUnsafeColumnWriter {
-  override def write(source: Row, target: UnsafeRow, column: Int, appendCursor: Int): Int = {
+  override def write(
+      source: InternalRow,
+      target: UnsafeRow,
+      column: Int,
+      appendCursor: Int): Int = {
     target.setBoolean(column, source.getBoolean(column))
     0
   }
 }
 
 private class ByteUnsafeColumnWriter private() extends PrimitiveUnsafeColumnWriter {
-  override def write(source: Row, target: UnsafeRow, column: Int, appendCursor: Int): Int = {
+  override def write(
+      source: InternalRow,
+      target: UnsafeRow,
+      column: Int,
+      appendCursor: Int): Int = {
     target.setByte(column, source.getByte(column))
     0
   }
 }
 
 private class ShortUnsafeColumnWriter private() extends PrimitiveUnsafeColumnWriter {
-  override def write(source: Row, target: UnsafeRow, column: Int, appendCursor: Int): Int = {
+  override def write(
+      source: InternalRow,
+      target: UnsafeRow,
+      column: Int,
+      appendCursor: Int): Int = {
     target.setShort(column, source.getShort(column))
     0
   }
 }
 
 private class IntUnsafeColumnWriter private() extends PrimitiveUnsafeColumnWriter {
-  override def write(source: Row, target: UnsafeRow, column: Int, appendCursor: Int): Int = {
+  override def write(
+      source: InternalRow,
+      target: UnsafeRow,
+      column: Int,
+      appendCursor: Int): Int = {
     target.setInt(column, source.getInt(column))
     0
   }
 }
 
 private class LongUnsafeColumnWriter private() extends PrimitiveUnsafeColumnWriter {
-  override def write(source: Row, target: UnsafeRow, column: Int, appendCursor: Int): Int = {
+  override def write(
+      source: InternalRow,
+      target: UnsafeRow,
+      column: Int,
+      appendCursor: Int): Int = {
     target.setLong(column, source.getLong(column))
     0
   }
 }
 
 private class FloatUnsafeColumnWriter private() extends PrimitiveUnsafeColumnWriter {
-  override def write(source: Row, target: UnsafeRow, column: Int, appendCursor: Int): Int = {
+  override def write(
+      source: InternalRow,
+      target: UnsafeRow,
+      column: Int,
+      appendCursor: Int): Int = {
     target.setFloat(column, source.getFloat(column))
     0
   }
 }
 
 private class DoubleUnsafeColumnWriter private() extends PrimitiveUnsafeColumnWriter {
-  override def write(source: Row, target: UnsafeRow, column: Int, appendCursor: Int): Int = {
+  override def write(
+      source: InternalRow,
+      target: UnsafeRow,
+      column: Int,
+      appendCursor: Int): Int = {
     target.setDouble(column, source.getDouble(column))
     0
   }
 }
 
 private class StringUnsafeColumnWriter private() extends UnsafeColumnWriter {
-  def getSize(source: Row, column: Int): Int = {
+  def getSize(source: InternalRow, column: Int): Int = {
     val numBytes = source.get(column).asInstanceOf[UTF8String].getBytes.length
     8 + ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes)
   }
 
-  override def write(source: Row, target: UnsafeRow, column: Int, appendCursor: Int): Int = {
+  override def write(
+      source: InternalRow,
+      target: UnsafeRow,
+      column: Int,
+      appendCursor: Int): Int = {
     val value = source.get(column).asInstanceOf[UTF8String]
     val baseObject = target.getBaseObject
     val baseOffset = target.getBaseOffset


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org