You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/01/01 22:39:23 UTC

spark git commit: [SPARK-12286][SPARK-12290][SPARK-12294][SPARK-12284][SQL] always output UnsafeRow

Repository: spark
Updated Branches:
  refs/heads/master 6c20b3c08 -> 0da7bd50d


[SPARK-12286][SPARK-12290][SPARK-12294][SPARK-12284][SQL] always output UnsafeRow

It's confusing that some operator output UnsafeRow but some not, easy to make mistake.

This PR change to only output UnsafeRow for all the operators (SparkPlan), removed the rule to insert Unsafe/Safe conversions. For those that can't output UnsafeRow directly, added UnsafeProjection into them.

Closes #10330

cc JoshRosen rxin

Author: Davies Liu <da...@databricks.com>

Closes #10511 from davies/unsafe_row.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0da7bd50
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0da7bd50
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0da7bd50

Branch: refs/heads/master
Commit: 0da7bd50ddf0fb9e0e8aeadb9c7fb3edf6f0ee6e
Parents: 6c20b3c
Author: Davies Liu <da...@databricks.com>
Authored: Fri Jan 1 13:39:20 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Fri Jan 1 13:39:20 2016 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/SQLContext.scala |   3 +-
 .../apache/spark/sql/execution/Exchange.scala   |  25 +--
 .../spark/sql/execution/ExistingRDD.scala       |  15 +-
 .../org/apache/spark/sql/execution/Expand.scala |  13 +-
 .../apache/spark/sql/execution/Generate.scala   |   8 +-
 .../spark/sql/execution/LocalTableScan.scala    |  13 +-
 .../org/apache/spark/sql/execution/Sort.scala   |   4 -
 .../apache/spark/sql/execution/SparkPlan.scala  |  23 ---
 .../org/apache/spark/sql/execution/Window.scala |   8 +-
 .../aggregate/SortBasedAggregate.scala          |   4 -
 .../SortBasedAggregationIterator.scala          |   8 +-
 .../execution/aggregate/TungstenAggregate.scala |   4 -
 .../spark/sql/execution/basicOperators.scala    |  58 +------
 .../columnar/InMemoryColumnarTableScan.scala    |   8 +-
 .../joins/BroadcastNestedLoopJoin.scala         |   7 -
 .../sql/execution/joins/CartesianProduct.scala  |   4 -
 .../spark/sql/execution/joins/HashJoin.scala    |   4 -
 .../sql/execution/joins/HashOuterJoin.scala     |   4 -
 .../sql/execution/joins/HashSemiJoin.scala      |   4 -
 .../sql/execution/joins/LeftSemiJoinBNL.scala   |   3 -
 .../sql/execution/joins/SortMergeJoin.scala     |   4 -
 .../execution/joins/SortMergeOuterJoin.scala    |   4 -
 .../spark/sql/execution/local/LocalNode.scala   |  12 --
 .../execution/local/NestedLoopJoinNode.scala    |   6 +-
 .../org/apache/spark/sql/execution/python.scala |   7 +-
 .../sql/execution/rowFormatConverters.scala     | 108 ------------
 .../spark/sql/execution/ExchangeSuite.scala     |   2 +-
 .../spark/sql/execution/ExpandSuite.scala       |  54 ------
 .../execution/RowFormatConvertersSuite.scala    | 164 -------------------
 .../apache/spark/sql/execution/SortSuite.scala  |   2 +-
 .../sql/hive/execution/HiveTableScan.scala      |  16 +-
 .../hive/execution/InsertIntoHiveTable.scala    |  15 +-
 .../hive/execution/ScriptTransformation.scala   |   3 +-
 .../sql/sources/hadoopFsRelationSuites.scala    |  31 ----
 34 files changed, 74 insertions(+), 574 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0da7bd50/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index eadf5cb..0223032 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -904,8 +904,7 @@ class SQLContext private[sql](
   @transient
   protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] {
     val batches = Seq(
-      Batch("Add exchange", Once, EnsureRequirements(self)),
-      Batch("Add row converters", Once, EnsureRowFormats)
+      Batch("Add exchange", Once, EnsureRequirements(self))
     )
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0da7bd50/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 62cbc51..7b41619 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -28,7 +28,6 @@ import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.errors.attachTree
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.util.MutablePair
@@ -50,26 +49,14 @@ case class Exchange(
       case None => ""
     }
 
-    val simpleNodeName = if (tungstenMode) "TungstenExchange" else "Exchange"
+    val simpleNodeName = "Exchange"
     s"$simpleNodeName$extraInfo"
   }
 
-  /**
-   * Returns true iff we can support the data type, and we are not doing range partitioning.
-   */
-  private lazy val tungstenMode: Boolean = !newPartitioning.isInstanceOf[RangePartitioning]
-
   override def outputPartitioning: Partitioning = newPartitioning
 
   override def output: Seq[Attribute] = child.output
 
-  // This setting is somewhat counterintuitive:
-  // If the schema works with UnsafeRow, then we tell the planner that we don't support safe row,
-  // so the planner inserts a converter to convert data into UnsafeRow if needed.
-  override def outputsUnsafeRows: Boolean = tungstenMode
-  override def canProcessSafeRows: Boolean = !tungstenMode
-  override def canProcessUnsafeRows: Boolean = tungstenMode
-
   /**
    * Determines whether records must be defensively copied before being sent to the shuffle.
    * Several of Spark's shuffle components will buffer deserialized Java objects in memory. The
@@ -130,15 +117,7 @@ case class Exchange(
     }
   }
 
-  @transient private lazy val sparkConf = child.sqlContext.sparkContext.getConf
-
-  private val serializer: Serializer = {
-    if (tungstenMode) {
-      new UnsafeRowSerializer(child.output.size)
-    } else {
-      new SparkSqlSerializer(sparkConf)
-    }
-  }
+  private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
 
   override protected def doPrepare(): Unit = {
     // If an ExchangeCoordinator is needed, we register this Exchange operator

http://git-wip-us.apache.org/repos/asf/spark/blob/0da7bd50/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 5c01af0..fc508bf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, GenericMutableRow}
+import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, Attribute, AttributeSet, GenericMutableRow}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
 import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation}
 import org.apache.spark.sql.types.DataType
@@ -99,10 +99,19 @@ private[sql] case class PhysicalRDD(
     rdd: RDD[InternalRow],
     override val nodeName: String,
     override val metadata: Map[String, String] = Map.empty,
-    override val outputsUnsafeRows: Boolean = false)
+    isUnsafeRow: Boolean = false)
   extends LeafNode {
 
-  protected override def doExecute(): RDD[InternalRow] = rdd
+  protected override def doExecute(): RDD[InternalRow] = {
+    if (isUnsafeRow) {
+      rdd
+    } else {
+      rdd.mapPartitionsInternal { iter =>
+        val proj = UnsafeProjection.create(schema)
+        iter.map(proj)
+      }
+    }
+  }
 
   override def simpleString: String = {
     val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value"

http://git-wip-us.apache.org/repos/asf/spark/blob/0da7bd50/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
index 91530bd..c3683cc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
@@ -41,20 +41,11 @@ case class Expand(
   // as UNKNOWN partitioning
   override def outputPartitioning: Partitioning = UnknownPartitioning(0)
 
-  override def outputsUnsafeRows: Boolean = child.outputsUnsafeRows
-  override def canProcessUnsafeRows: Boolean = true
-  override def canProcessSafeRows: Boolean = true
-
   override def references: AttributeSet =
     AttributeSet(projections.flatten.flatMap(_.references))
 
-  private[this] val projection = {
-    if (outputsUnsafeRows) {
-      (exprs: Seq[Expression]) => UnsafeProjection.create(exprs, child.output)
-    } else {
-      (exprs: Seq[Expression]) => newMutableProjection(exprs, child.output)()
-    }
-  }
+  private[this] val projection =
+    (exprs: Seq[Expression]) => UnsafeProjection.create(exprs, child.output)
 
   protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
     child.execute().mapPartitions { iter =>

http://git-wip-us.apache.org/repos/asf/spark/blob/0da7bd50/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
index 0c613e9..4db88a0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
@@ -64,6 +64,7 @@ case class Generate(
       child.execute().mapPartitionsInternal { iter =>
         val generatorNullRow = InternalRow.fromSeq(Seq.fill[Any](generator.elementTypes.size)(null))
         val joinedRow = new JoinedRow
+        val proj = UnsafeProjection.create(output, output)
 
         iter.flatMap { row =>
           // we should always set the left (child output)
@@ -77,13 +78,14 @@ case class Generate(
         } ++ LazyIterator(() => boundGenerator.terminate()).map { row =>
           // we leave the left side as the last element of its child output
           // keep it the same as Hive does
-          joinedRow.withRight(row)
+          proj(joinedRow.withRight(row))
         }
       }
     } else {
       child.execute().mapPartitionsInternal { iter =>
-        iter.flatMap(row => boundGenerator.eval(row)) ++
-        LazyIterator(() => boundGenerator.terminate())
+        val proj = UnsafeProjection.create(output, output)
+        (iter.flatMap(row => boundGenerator.eval(row)) ++
+          LazyIterator(() => boundGenerator.terminate())).map(proj)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/0da7bd50/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
index ba7f628..59057bf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection}
 
 
 /**
@@ -29,15 +29,20 @@ private[sql] case class LocalTableScan(
     output: Seq[Attribute],
     rows: Seq[InternalRow]) extends LeafNode {
 
-  private lazy val rdd = sqlContext.sparkContext.parallelize(rows)
+  private val unsafeRows: Array[InternalRow] = {
+    val proj = UnsafeProjection.create(output, output)
+    rows.map(r => proj(r).copy()).toArray
+  }
+
+  private lazy val rdd = sqlContext.sparkContext.parallelize(unsafeRows)
 
   protected override def doExecute(): RDD[InternalRow] = rdd
 
   override def executeCollect(): Array[InternalRow] = {
-    rows.toArray
+    unsafeRows
   }
 
   override def executeTake(limit: Int): Array[InternalRow] = {
-    rows.take(limit).toArray
+    unsafeRows.take(limit)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0da7bd50/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
index 24207cb..73dc8cb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
@@ -39,10 +39,6 @@ case class Sort(
     testSpillFrequency: Int = 0)
   extends UnaryNode {
 
-  override def outputsUnsafeRows: Boolean = true
-  override def canProcessUnsafeRows: Boolean = true
-  override def canProcessSafeRows: Boolean = false
-
   override def output: Seq[Attribute] = child.output
 
   override def outputOrdering: Seq[SortOrder] = sortOrder

http://git-wip-us.apache.org/repos/asf/spark/blob/0da7bd50/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index fe9b2ad..f20f32a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -97,17 +97,6 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
   /** Specifies sort order for each partition requirements on the input data for this operator. */
   def requiredChildOrdering: Seq[Seq[SortOrder]] = Seq.fill(children.size)(Nil)
 
-  /** Specifies whether this operator outputs UnsafeRows */
-  def outputsUnsafeRows: Boolean = false
-
-  /** Specifies whether this operator is capable of processing UnsafeRows */
-  def canProcessUnsafeRows: Boolean = false
-
-  /**
-   * Specifies whether this operator is capable of processing Java-object-based Rows (i.e. rows
-   * that are not UnsafeRows).
-   */
-  def canProcessSafeRows: Boolean = true
 
   /**
    * Returns the result of this query as an RDD[InternalRow] by delegating to doExecute
@@ -115,18 +104,6 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
    * Concrete implementations of SparkPlan should override doExecute instead.
    */
   final def execute(): RDD[InternalRow] = {
-    if (children.nonEmpty) {
-      val hasUnsafeInputs = children.exists(_.outputsUnsafeRows)
-      val hasSafeInputs = children.exists(!_.outputsUnsafeRows)
-      assert(!(hasSafeInputs && hasUnsafeInputs),
-        "Child operators should output rows in the same format")
-      assert(canProcessSafeRows || canProcessUnsafeRows,
-        "Operator must be able to process at least one row format")
-      assert(!hasSafeInputs || canProcessSafeRows,
-        "Operator will receive safe rows as input but cannot process safe rows")
-      assert(!hasUnsafeInputs || canProcessUnsafeRows,
-        "Operator will receive unsafe rows as input but cannot process unsafe rows")
-    }
     RDDOperationScope.withScope(sparkContext, nodeName, false, true) {
       prepare()
       doExecute()

http://git-wip-us.apache.org/repos/asf/spark/blob/0da7bd50/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
index c941d67..b79d93d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
@@ -100,8 +100,6 @@ case class Window(
 
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  override def canProcessUnsafeRows: Boolean = true
-
   /**
    * Create a bound ordering object for a given frame type and offset. A bound ordering object is
    * used to determine which input row lies within the frame boundaries of an output row.
@@ -259,16 +257,16 @@ case class Window(
    * @return the final resulting projection.
    */
   private[this] def createResultProjection(
-      expressions: Seq[Expression]): MutableProjection = {
+      expressions: Seq[Expression]): UnsafeProjection = {
     val references = expressions.zipWithIndex.map{ case (e, i) =>
       // Results of window expressions will be on the right side of child's output
       BoundReference(child.output.size + i, e.dataType, e.nullable)
     }
     val unboundToRefMap = expressions.zip(references).toMap
     val patchedWindowExpression = windowExpression.map(_.transform(unboundToRefMap))
-    newMutableProjection(
+    UnsafeProjection.create(
       projectList ++ patchedWindowExpression,
-      child.output)()
+      child.output)
   }
 
   protected override def doExecute(): RDD[InternalRow] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/0da7bd50/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala
index c4587ba..01d0766 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala
@@ -49,10 +49,6 @@ case class SortBasedAggregate(
     "numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of input rows"),
     "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
 
-  override def outputsUnsafeRows: Boolean = true
-  override def canProcessUnsafeRows: Boolean = false
-  override def canProcessSafeRows: Boolean = true
-
   override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute)
 
   override def requiredChildDistribution: List[Distribution] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/0da7bd50/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
index ac920aa..6501634 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
@@ -87,6 +87,10 @@ class SortBasedAggregationIterator(
   // The aggregation buffer used by the sort-based aggregation.
   private[this] val sortBasedAggregationBuffer: MutableRow = newBuffer
 
+  // An SafeProjection to turn UnsafeRow into GenericInternalRow, because UnsafeRow can't be
+  // compared to MutableRow (aggregation buffer) directly.
+  private[this] val safeProj: Projection = FromUnsafeProjection(valueAttributes.map(_.dataType))
+
   protected def initialize(): Unit = {
     if (inputIterator.hasNext) {
       initializeBuffer(sortBasedAggregationBuffer)
@@ -110,7 +114,7 @@ class SortBasedAggregationIterator(
     // We create a variable to track if we see the next group.
     var findNextPartition = false
     // firstRowInNextGroup is the first row of this group. We first process it.
-    processRow(sortBasedAggregationBuffer, firstRowInNextGroup)
+    processRow(sortBasedAggregationBuffer, safeProj(firstRowInNextGroup))
 
     // The search will stop when we see the next group or there is no
     // input row left in the iter.
@@ -122,7 +126,7 @@ class SortBasedAggregationIterator(
 
       // Check if the current row belongs the current input row.
       if (currentGroupingKey == groupingKey) {
-        processRow(sortBasedAggregationBuffer, currentRow)
+        processRow(sortBasedAggregationBuffer, safeProj(currentRow))
       } else {
         // We find a new group.
         findNextPartition = true

http://git-wip-us.apache.org/repos/asf/spark/blob/0da7bd50/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
index 9d758eb..999ebb7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
@@ -49,10 +49,6 @@ case class TungstenAggregate(
     "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
     "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"))
 
-  override def outputsUnsafeRows: Boolean = true
-  override def canProcessUnsafeRows: Boolean = true
-  override def canProcessSafeRows: Boolean = true
-
   override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute)
 
   override def producedAttributes: AttributeSet =

http://git-wip-us.apache.org/repos/asf/spark/blob/0da7bd50/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index f19d72f..af7237e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -36,10 +36,6 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends
   override private[sql] lazy val metrics = Map(
     "numRows" -> SQLMetrics.createLongMetric(sparkContext, "number of rows"))
 
-  override def outputsUnsafeRows: Boolean = true
-  override def canProcessUnsafeRows: Boolean = true
-  override def canProcessSafeRows: Boolean = true
-
   override def output: Seq[Attribute] = projectList.map(_.toAttribute)
 
   protected override def doExecute(): RDD[InternalRow] = {
@@ -80,12 +76,6 @@ case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode {
   }
 
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
-
-  override def outputsUnsafeRows: Boolean = child.outputsUnsafeRows
-
-  override def canProcessUnsafeRows: Boolean = true
-
-  override def canProcessSafeRows: Boolean = true
 }
 
 /**
@@ -108,10 +98,6 @@ case class Sample(
 {
   override def output: Seq[Attribute] = child.output
 
-  override def outputsUnsafeRows: Boolean = child.outputsUnsafeRows
-  override def canProcessUnsafeRows: Boolean = true
-  override def canProcessSafeRows: Boolean = true
-
   protected override def doExecute(): RDD[InternalRow] = {
     if (withReplacement) {
       // Disable gap sampling since the gap sampling method buffers two rows internally,
@@ -135,8 +121,6 @@ case class Range(
     output: Seq[Attribute])
   extends LeafNode {
 
-  override def outputsUnsafeRows: Boolean = true
-
   protected override def doExecute(): RDD[InternalRow] = {
     sqlContext
       .sparkContext
@@ -199,9 +183,6 @@ case class Union(children: Seq[SparkPlan]) extends SparkPlan {
       }
     }
   }
-  override def outputsUnsafeRows: Boolean = children.exists(_.outputsUnsafeRows)
-  override def canProcessUnsafeRows: Boolean = true
-  override def canProcessSafeRows: Boolean = true
   protected override def doExecute(): RDD[InternalRow] =
     sparkContext.union(children.map(_.execute()))
 }
@@ -268,12 +249,14 @@ case class TakeOrderedAndProject(
   // and this ordering needs to be created on the driver in order to be passed into Spark core code.
   private val ord: InterpretedOrdering = new InterpretedOrdering(sortOrder, child.output)
 
-  // TODO: remove @transient after figure out how to clean closure at InsertIntoHiveTable.
-  @transient private val projection = projectList.map(new InterpretedProjection(_, child.output))
-
   private def collectData(): Array[InternalRow] = {
     val data = child.execute().map(_.copy()).takeOrdered(limit)(ord)
-    projection.map(data.map(_)).getOrElse(data)
+    if (projectList.isDefined) {
+      val proj = UnsafeProjection.create(projectList.get, child.output)
+      data.map(r => proj(r).copy())
+    } else {
+      data
+    }
   }
 
   override def executeCollect(): Array[InternalRow] = {
@@ -311,10 +294,6 @@ case class Coalesce(numPartitions: Int, child: SparkPlan) extends UnaryNode {
   protected override def doExecute(): RDD[InternalRow] = {
     child.execute().coalesce(numPartitions, shuffle = false)
   }
-
-  override def outputsUnsafeRows: Boolean = child.outputsUnsafeRows
-  override def canProcessUnsafeRows: Boolean = true
-  override def canProcessSafeRows: Boolean = true
 }
 
 /**
@@ -327,10 +306,6 @@ case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode {
   protected override def doExecute(): RDD[InternalRow] = {
     left.execute().map(_.copy()).subtract(right.execute().map(_.copy()))
   }
-
-  override def outputsUnsafeRows: Boolean = children.exists(_.outputsUnsafeRows)
-  override def canProcessUnsafeRows: Boolean = true
-  override def canProcessSafeRows: Boolean = true
 }
 
 /**
@@ -343,10 +318,6 @@ case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode {
   protected override def doExecute(): RDD[InternalRow] = {
     left.execute().map(_.copy()).intersection(right.execute().map(_.copy()))
   }
-
-  override def outputsUnsafeRows: Boolean = children.exists(_.outputsUnsafeRows)
-  override def canProcessUnsafeRows: Boolean = true
-  override def canProcessSafeRows: Boolean = true
 }
 
 /**
@@ -371,10 +342,6 @@ case class MapPartitions[T, U](
     child: SparkPlan) extends UnaryNode {
   override def producedAttributes: AttributeSet = outputSet
 
-  override def canProcessSafeRows: Boolean = true
-  override def canProcessUnsafeRows: Boolean = true
-  override def outputsUnsafeRows: Boolean = true
-
   override protected def doExecute(): RDD[InternalRow] = {
     child.execute().mapPartitionsInternal { iter =>
       val tBoundEncoder = tEncoder.bind(child.output)
@@ -394,11 +361,6 @@ case class AppendColumns[T, U](
     child: SparkPlan) extends UnaryNode {
   override def producedAttributes: AttributeSet = AttributeSet(newColumns)
 
-  // We are using an unsafe combiner.
-  override def canProcessSafeRows: Boolean = false
-  override def canProcessUnsafeRows: Boolean = true
-  override def outputsUnsafeRows: Boolean = true
-
   override def output: Seq[Attribute] = child.output ++ newColumns
 
   override protected def doExecute(): RDD[InternalRow] = {
@@ -428,10 +390,6 @@ case class MapGroups[K, T, U](
     child: SparkPlan) extends UnaryNode {
   override def producedAttributes: AttributeSet = outputSet
 
-  override def canProcessSafeRows: Boolean = true
-  override def canProcessUnsafeRows: Boolean = true
-  override def outputsUnsafeRows: Boolean = true
-
   override def requiredChildDistribution: Seq[Distribution] =
     ClusteredDistribution(groupingAttributes) :: Nil
 
@@ -472,10 +430,6 @@ case class CoGroup[Key, Left, Right, Result](
     right: SparkPlan) extends BinaryNode {
   override def producedAttributes: AttributeSet = outputSet
 
-  override def canProcessSafeRows: Boolean = true
-  override def canProcessUnsafeRows: Boolean = true
-  override def outputsUnsafeRows: Boolean = true
-
   override def requiredChildDistribution: Seq[Distribution] =
     ClusteredDistribution(leftGroup) :: ClusteredDistribution(rightGroup) :: Nil
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0da7bd50/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala
index aa7a668..d809123 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
-import org.apache.spark.sql.execution.{ConvertToUnsafe, LeafNode, SparkPlan}
+import org.apache.spark.sql.execution.{LeafNode, SparkPlan}
 import org.apache.spark.sql.types.UserDefinedType
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.{Accumulable, Accumulator, Accumulators}
@@ -39,9 +39,7 @@ private[sql] object InMemoryRelation {
       storageLevel: StorageLevel,
       child: SparkPlan,
       tableName: Option[String]): InMemoryRelation =
-    new InMemoryRelation(child.output, useCompression, batchSize, storageLevel,
-      if (child.outputsUnsafeRows) child else ConvertToUnsafe(child),
-      tableName)()
+    new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child, tableName)()
 }
 
 /**
@@ -226,8 +224,6 @@ private[sql] case class InMemoryColumnarTableScan(
   // The cached version does not change the outputOrdering of the original SparkPlan.
   override def outputOrdering: Seq[SortOrder] = relation.child.outputOrdering
 
-  override def outputsUnsafeRows: Boolean = true
-
   private def statsFor(a: Attribute) = relation.partitionStatistics.forAttribute(a)
 
   // Returned filter predicate should return false iff it is impossible for the input expression

http://git-wip-us.apache.org/repos/asf/spark/blob/0da7bd50/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
index aab177b..54275c2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
@@ -46,15 +46,8 @@ case class BroadcastNestedLoopJoin(
     case BuildLeft => (right, left)
   }
 
-  override def outputsUnsafeRows: Boolean = left.outputsUnsafeRows || right.outputsUnsafeRows
-  override def canProcessUnsafeRows: Boolean = true
-
   private[this] def genResultProjection: InternalRow => InternalRow = {
-    if (outputsUnsafeRows) {
       UnsafeProjection.create(schema)
-    } else {
-      identity[InternalRow]
-    }
   }
 
   override def outputPartitioning: Partitioning = streamed.outputPartitioning

http://git-wip-us.apache.org/repos/asf/spark/blob/0da7bd50/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
index 81bfe4e..d9fa4c6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
@@ -81,10 +81,6 @@ class UnsafeCartesianRDD(left : RDD[UnsafeRow], right : RDD[UnsafeRow], numField
 case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNode {
   override def output: Seq[Attribute] = left.output ++ right.output
 
-  override def canProcessSafeRows: Boolean = false
-  override def canProcessUnsafeRows: Boolean = true
-  override def outputsUnsafeRows: Boolean = true
-
   override private[sql] lazy val metrics = Map(
     "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
     "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),

http://git-wip-us.apache.org/repos/asf/spark/blob/0da7bd50/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index fb961d9..7f9d9da 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -44,10 +44,6 @@ trait HashJoin {
 
   override def output: Seq[Attribute] = left.output ++ right.output
 
-  override def outputsUnsafeRows: Boolean = true
-  override def canProcessUnsafeRows: Boolean = true
-  override def canProcessSafeRows: Boolean = false
-
   protected def buildSideKeyGenerator: Projection =
     UnsafeProjection.create(buildKeys, buildPlan.output)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0da7bd50/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
index c6e5868..6d464d6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
@@ -64,10 +64,6 @@ trait HashOuterJoin {
         s"HashOuterJoin should not take $x as the JoinType")
   }
 
-  override def outputsUnsafeRows: Boolean = true
-  override def canProcessUnsafeRows: Boolean = true
-  override def canProcessSafeRows: Boolean = false
-
   protected def buildKeyGenerator: Projection =
     UnsafeProjection.create(buildKeys, buildPlan.output)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0da7bd50/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
index f23a183..3e0f74c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
@@ -33,10 +33,6 @@ trait HashSemiJoin {
 
   override def output: Seq[Attribute] = left.output
 
-  override def outputsUnsafeRows: Boolean = true
-  override def canProcessUnsafeRows: Boolean = true
-  override def canProcessSafeRows: Boolean = false
-
   protected def leftKeyGenerator: Projection =
     UnsafeProjection.create(leftKeys, left.output)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0da7bd50/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
index efa7b49..82498ee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
@@ -42,9 +42,6 @@ case class LeftSemiJoinBNL(
 
   override def output: Seq[Attribute] = left.output
 
-  override def outputsUnsafeRows: Boolean = streamed.outputsUnsafeRows
-  override def canProcessUnsafeRows: Boolean = true
-
   /** The Streamed Relation */
   override def left: SparkPlan = streamed
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0da7bd50/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
index 4bf7b52..812f881 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
@@ -53,10 +53,6 @@ case class SortMergeJoin(
   override def requiredChildOrdering: Seq[Seq[SortOrder]] =
     requiredOrders(leftKeys) :: requiredOrders(rightKeys) :: Nil
 
-  override def outputsUnsafeRows: Boolean = true
-  override def canProcessUnsafeRows: Boolean = true
-  override def canProcessSafeRows: Boolean = false
-
   private def requiredOrders(keys: Seq[Expression]): Seq[SortOrder] = {
     // This must be ascending in order to agree with the `keyOrdering` defined in `doExecute()`.
     keys.map(SortOrder(_, Ascending))

http://git-wip-us.apache.org/repos/asf/spark/blob/0da7bd50/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala
index 7ce38eb..c3a2bfc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala
@@ -89,10 +89,6 @@ case class SortMergeOuterJoin(
     keys.map(SortOrder(_, Ascending))
   }
 
-  override def outputsUnsafeRows: Boolean = true
-  override def canProcessUnsafeRows: Boolean = true
-  override def canProcessSafeRows: Boolean = false
-
   private def createLeftKeyGenerator(): Projection =
     UnsafeProjection.create(leftKeys, left.output)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0da7bd50/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
index 6a882c9..e462170 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
@@ -69,18 +69,6 @@ abstract class LocalNode(conf: SQLConf) extends QueryPlan[LocalNode] with Loggin
    */
   def close(): Unit
 
-  /** Specifies whether this operator outputs UnsafeRows */
-  def outputsUnsafeRows: Boolean = false
-
-  /** Specifies whether this operator is capable of processing UnsafeRows */
-  def canProcessUnsafeRows: Boolean = false
-
-  /**
-   * Specifies whether this operator is capable of processing Java-object-based Rows (i.e. rows
-   * that are not UnsafeRows).
-   */
-  def canProcessSafeRows: Boolean = true
-
   /**
    * Returns the content through the [[Iterator]] interface.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/0da7bd50/sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala
index 7321fc6..b7fa0c0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala
@@ -47,11 +47,7 @@ case class NestedLoopJoinNode(
   }
 
   private[this] def genResultProjection: InternalRow => InternalRow = {
-    if (outputsUnsafeRows) {
-      UnsafeProjection.create(schema)
-    } else {
-      identity[InternalRow]
-    }
+    UnsafeProjection.create(schema)
   }
 
   private[this] var currentRow: InternalRow = _

http://git-wip-us.apache.org/repos/asf/spark/blob/0da7bd50/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala
index defcec9..efb4b09 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala
@@ -351,10 +351,6 @@ case class BatchPythonEvaluation(udf: PythonUDF, output: Seq[Attribute], child:
 
   def children: Seq[SparkPlan] = child :: Nil
 
-  override def outputsUnsafeRows: Boolean = false
-  override def canProcessUnsafeRows: Boolean = true
-  override def canProcessSafeRows: Boolean = true
-
   protected override def doExecute(): RDD[InternalRow] = {
     val inputRDD = child.execute().map(_.copy())
     val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536)
@@ -400,13 +396,14 @@ case class BatchPythonEvaluation(udf: PythonUDF, output: Seq[Attribute], child:
       val unpickle = new Unpickler
       val row = new GenericMutableRow(1)
       val joined = new JoinedRow
+      val resultProj = UnsafeProjection.create(output, output)
 
       outputIterator.flatMap { pickedResult =>
         val unpickledBatch = unpickle.loads(pickedResult)
         unpickledBatch.asInstanceOf[java.util.ArrayList[Any]].asScala
       }.map { result =>
         row(0) = EvaluatePython.fromJava(result, udf.dataType)
-        joined(queue.poll(), row)
+        resultProj(joined(queue.poll(), row))
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/0da7bd50/sql/core/src/main/scala/org/apache/spark/sql/execution/rowFormatConverters.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/rowFormatConverters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/rowFormatConverters.scala
deleted file mode 100644
index 5f8fc2d..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/rowFormatConverters.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical.Partitioning
-import org.apache.spark.sql.catalyst.rules.Rule
-
-/**
- * Converts Java-object-based rows into [[UnsafeRow]]s.
- */
-case class ConvertToUnsafe(child: SparkPlan) extends UnaryNode {
-
-  override def output: Seq[Attribute] = child.output
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
-  override def outputsUnsafeRows: Boolean = true
-  override def canProcessUnsafeRows: Boolean = false
-  override def canProcessSafeRows: Boolean = true
-  override protected def doExecute(): RDD[InternalRow] = {
-    child.execute().mapPartitions { iter =>
-      val convertToUnsafe = UnsafeProjection.create(child.schema)
-      iter.map(convertToUnsafe)
-    }
-  }
-}
-
-/**
- * Converts [[UnsafeRow]]s back into Java-object-based rows.
- */
-case class ConvertToSafe(child: SparkPlan) extends UnaryNode {
-  override def output: Seq[Attribute] = child.output
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
-  override def outputsUnsafeRows: Boolean = false
-  override def canProcessUnsafeRows: Boolean = true
-  override def canProcessSafeRows: Boolean = false
-  override protected def doExecute(): RDD[InternalRow] = {
-    child.execute().mapPartitions { iter =>
-      val convertToSafe = FromUnsafeProjection(child.output.map(_.dataType))
-      iter.map(convertToSafe)
-    }
-  }
-}
-
-private[sql] object EnsureRowFormats extends Rule[SparkPlan] {
-
-  private def onlyHandlesSafeRows(operator: SparkPlan): Boolean =
-    operator.canProcessSafeRows && !operator.canProcessUnsafeRows
-
-  private def onlyHandlesUnsafeRows(operator: SparkPlan): Boolean =
-    operator.canProcessUnsafeRows && !operator.canProcessSafeRows
-
-  private def handlesBothSafeAndUnsafeRows(operator: SparkPlan): Boolean =
-    operator.canProcessSafeRows && operator.canProcessUnsafeRows
-
-  override def apply(operator: SparkPlan): SparkPlan = operator.transformUp {
-    case operator: SparkPlan if onlyHandlesSafeRows(operator) =>
-      if (operator.children.exists(_.outputsUnsafeRows)) {
-        operator.withNewChildren {
-          operator.children.map {
-            c => if (c.outputsUnsafeRows) ConvertToSafe(c) else c
-          }
-        }
-      } else {
-        operator
-      }
-    case operator: SparkPlan if onlyHandlesUnsafeRows(operator) =>
-      if (operator.children.exists(!_.outputsUnsafeRows)) {
-        operator.withNewChildren {
-          operator.children.map {
-            c => if (!c.outputsUnsafeRows) ConvertToUnsafe(c) else c
-          }
-        }
-      } else {
-        operator
-      }
-    case operator: SparkPlan if handlesBothSafeAndUnsafeRows(operator) =>
-      if (operator.children.map(_.outputsUnsafeRows).toSet.size != 1) {
-        // If this operator's children produce both unsafe and safe rows,
-        // convert everything unsafe rows.
-        operator.withNewChildren {
-          operator.children.map {
-            c => if (!c.outputsUnsafeRows) ConvertToUnsafe(c) else c
-          }
-        }
-      } else {
-        operator
-      }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/0da7bd50/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala
index 911d12e..87bff32 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala
@@ -28,7 +28,7 @@ class ExchangeSuite extends SparkPlanTest with SharedSQLContext {
     val input = (1 to 1000).map(Tuple1.apply)
     checkAnswer(
       input.toDF(),
-      plan => ConvertToSafe(Exchange(SinglePartition, ConvertToUnsafe(plan))),
+      plan => Exchange(SinglePartition, plan),
       input.map(Row.fromTuple)
     )
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/0da7bd50/sql/core/src/test/scala/org/apache/spark/sql/execution/ExpandSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExpandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExpandSuite.scala
deleted file mode 100644
index faef76d..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExpandSuite.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, Alias, Literal}
-import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types.IntegerType
-
-class ExpandSuite extends SparkPlanTest with SharedSQLContext {
-  import testImplicits.localSeqToDataFrameHolder
-
-  private def testExpand(f: SparkPlan => SparkPlan): Unit = {
-    val input = (1 to 1000).map(Tuple1.apply)
-    val projections = Seq.tabulate(2) { i =>
-      Alias(BoundReference(0, IntegerType, false), "id")() :: Alias(Literal(i), "gid")() :: Nil
-    }
-    val attributes = projections.head.map(_.toAttribute)
-    checkAnswer(
-      input.toDF(),
-      plan => Expand(projections, attributes, f(plan)),
-      input.flatMap(i => Seq.tabulate(2)(j => Row(i._1, j)))
-    )
-  }
-
-  test("inheriting child row type") {
-    val exprs = AttributeReference("a", IntegerType, false)() :: Nil
-    val plan = Expand(Seq(exprs), exprs, ConvertToUnsafe(LocalTableScan(exprs, Seq.empty)))
-    assert(plan.outputsUnsafeRows, "Expand should inherits the created row type from its child.")
-  }
-
-  test("expanding UnsafeRows") {
-    testExpand(ConvertToUnsafe)
-  }
-
-  test("expanding SafeRows") {
-    testExpand(identity)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/0da7bd50/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
deleted file mode 100644
index 2328899..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{SQLContext, Row}
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Attribute, Literal, IsNull}
-import org.apache.spark.sql.catalyst.util.GenericArrayData
-import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types.{ArrayType, StringType}
-import org.apache.spark.unsafe.types.UTF8String
-
-class RowFormatConvertersSuite extends SparkPlanTest with SharedSQLContext {
-
-  private def getConverters(plan: SparkPlan): Seq[SparkPlan] = plan.collect {
-    case c: ConvertToUnsafe => c
-    case c: ConvertToSafe => c
-  }
-
-  private val outputsSafe = ReferenceSort(Nil, false, PhysicalRDD(Seq.empty, null, "name"))
-  assert(!outputsSafe.outputsUnsafeRows)
-  private val outputsUnsafe = Sort(Nil, false, PhysicalRDD(Seq.empty, null, "name"))
-  assert(outputsUnsafe.outputsUnsafeRows)
-
-  test("planner should insert unsafe->safe conversions when required") {
-    val plan = Limit(10, outputsUnsafe)
-    val preparedPlan = sqlContext.prepareForExecution.execute(plan)
-    assert(preparedPlan.children.head.isInstanceOf[ConvertToSafe])
-  }
-
-  test("filter can process unsafe rows") {
-    val plan = Filter(IsNull(IsNull(Literal(1))), outputsUnsafe)
-    val preparedPlan = sqlContext.prepareForExecution.execute(plan)
-    assert(getConverters(preparedPlan).size === 1)
-    assert(preparedPlan.outputsUnsafeRows)
-  }
-
-  test("filter can process safe rows") {
-    val plan = Filter(IsNull(IsNull(Literal(1))), outputsSafe)
-    val preparedPlan = sqlContext.prepareForExecution.execute(plan)
-    assert(getConverters(preparedPlan).isEmpty)
-    assert(!preparedPlan.outputsUnsafeRows)
-  }
-
-  test("coalesce can process unsafe rows") {
-    val plan = Coalesce(1, outputsUnsafe)
-    val preparedPlan = sqlContext.prepareForExecution.execute(plan)
-    assert(getConverters(preparedPlan).size === 1)
-    assert(preparedPlan.outputsUnsafeRows)
-  }
-
-  test("except can process unsafe rows") {
-    val plan = Except(outputsUnsafe, outputsUnsafe)
-    val preparedPlan = sqlContext.prepareForExecution.execute(plan)
-    assert(getConverters(preparedPlan).size === 2)
-    assert(preparedPlan.outputsUnsafeRows)
-  }
-
-  test("except requires all of its input rows' formats to agree") {
-    val plan = Except(outputsSafe, outputsUnsafe)
-    assert(plan.canProcessSafeRows && plan.canProcessUnsafeRows)
-    val preparedPlan = sqlContext.prepareForExecution.execute(plan)
-    assert(preparedPlan.outputsUnsafeRows)
-  }
-
-  test("intersect can process unsafe rows") {
-    val plan = Intersect(outputsUnsafe, outputsUnsafe)
-    val preparedPlan = sqlContext.prepareForExecution.execute(plan)
-    assert(getConverters(preparedPlan).size === 2)
-    assert(preparedPlan.outputsUnsafeRows)
-  }
-
-  test("intersect requires all of its input rows' formats to agree") {
-    val plan = Intersect(outputsSafe, outputsUnsafe)
-    assert(plan.canProcessSafeRows && plan.canProcessUnsafeRows)
-    val preparedPlan = sqlContext.prepareForExecution.execute(plan)
-    assert(preparedPlan.outputsUnsafeRows)
-  }
-
-  test("execute() fails an assertion if inputs rows are of different formats") {
-    val e = intercept[AssertionError] {
-      Union(Seq(outputsSafe, outputsUnsafe)).execute()
-    }
-    assert(e.getMessage.contains("format"))
-  }
-
-  test("union requires all of its input rows' formats to agree") {
-    val plan = Union(Seq(outputsSafe, outputsUnsafe))
-    assert(plan.canProcessSafeRows && plan.canProcessUnsafeRows)
-    val preparedPlan = sqlContext.prepareForExecution.execute(plan)
-    assert(preparedPlan.outputsUnsafeRows)
-  }
-
-  test("union can process safe rows") {
-    val plan = Union(Seq(outputsSafe, outputsSafe))
-    val preparedPlan = sqlContext.prepareForExecution.execute(plan)
-    assert(!preparedPlan.outputsUnsafeRows)
-  }
-
-  test("union can process unsafe rows") {
-    val plan = Union(Seq(outputsUnsafe, outputsUnsafe))
-    val preparedPlan = sqlContext.prepareForExecution.execute(plan)
-    assert(preparedPlan.outputsUnsafeRows)
-  }
-
-  test("round trip with ConvertToUnsafe and ConvertToSafe") {
-    val input = Seq(("hello", 1), ("world", 2))
-    checkAnswer(
-      sqlContext.createDataFrame(input),
-      plan => ConvertToSafe(ConvertToUnsafe(plan)),
-      input.map(Row.fromTuple)
-    )
-  }
-
-  test("SPARK-9683: copy UTF8String when convert unsafe array/map to safe") {
-    SQLContext.setActive(sqlContext)
-    val schema = ArrayType(StringType)
-    val rows = (1 to 100).map { i =>
-      InternalRow(new GenericArrayData(Array[Any](UTF8String.fromString(i.toString))))
-    }
-    val relation = LocalTableScan(Seq(AttributeReference("t", schema)()), rows)
-
-    val plan =
-      DummyPlan(
-        ConvertToSafe(
-          ConvertToUnsafe(relation)))
-    assert(plan.execute().collect().map(_.getUTF8String(0).toString) === (1 to 100).map(_.toString))
-  }
-}
-
-case class DummyPlan(child: SparkPlan) extends UnaryNode {
-
-  override protected def doExecute(): RDD[InternalRow] = {
-    child.execute().mapPartitions { iter =>
-      // This `DummyPlan` is in safe mode, so we don't need to do copy even we hold some
-      // values gotten from the incoming rows.
-      // we cache all strings here to make sure we have deep copied UTF8String inside incoming
-      // safe InternalRow.
-      val strings = new scala.collection.mutable.ArrayBuffer[UTF8String]
-      iter.foreach { row =>
-        strings += row.getArray(0).getUTF8String(0)
-      }
-      strings.map(InternalRow(_)).iterator
-    }
-  }
-
-  override def output: Seq[Attribute] = Seq(AttributeReference("a", StringType)())
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/0da7bd50/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
index e5d34be..af971df 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
@@ -99,7 +99,7 @@ class SortSuite extends SparkPlanTest with SharedSQLContext {
       )
       checkThatPlansAgree(
         inputDf,
-        p => ConvertToSafe(Sort(sortOrder, global = true, p: SparkPlan, testSpillFrequency = 23)),
+        p => Sort(sortOrder, global = true, p: SparkPlan, testSpillFrequency = 23),
         ReferenceSort(sortOrder, global = true, _: SparkPlan),
         sortAnswers = false
       )

http://git-wip-us.apache.org/repos/asf/spark/blob/0da7bd50/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
index 8141136..1588728 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
@@ -132,11 +132,17 @@ case class HiveTableScan(
     }
   }
 
-  protected override def doExecute(): RDD[InternalRow] = if (!relation.hiveQlTable.isPartitioned) {
-    hadoopReader.makeRDDForTable(relation.hiveQlTable)
-  } else {
-    hadoopReader.makeRDDForPartitionedTable(
-      prunePartitions(relation.getHiveQlPartitions(partitionPruningPred)))
+  protected override def doExecute(): RDD[InternalRow] = {
+    val rdd = if (!relation.hiveQlTable.isPartitioned) {
+      hadoopReader.makeRDDForTable(relation.hiveQlTable)
+    } else {
+      hadoopReader.makeRDDForPartitionedTable(
+        prunePartitions(relation.getHiveQlPartitions(partitionPruningPred)))
+    }
+    rdd.mapPartitionsInternal { iter =>
+      val proj = UnsafeProjection.create(schema)
+      iter.map(proj)
+    }
   }
 
   override def output: Seq[Attribute] = attributes

http://git-wip-us.apache.org/repos/asf/spark/blob/0da7bd50/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index f936cf5..44dc68e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -28,18 +28,17 @@ import org.apache.hadoop.hive.ql.{Context, ErrorMsg}
 import org.apache.hadoop.hive.serde2.Serializer
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
 import org.apache.hadoop.hive.serde2.objectinspector._
-import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf}
+import org.apache.hadoop.mapred.{FileOutputFormat, JobConf}
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.execution.{UnaryNode, SparkPlan}
+import org.apache.spark.sql.catalyst.expressions.{FromUnsafeProjection, Attribute}
+import org.apache.spark.sql.execution.{SparkPlan, UnaryNode}
 import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
 import org.apache.spark.sql.hive._
 import org.apache.spark.sql.types.DataType
-import org.apache.spark.{SparkException, TaskContext}
 import org.apache.spark.util.SerializableJobConf
+import org.apache.spark.{SparkException, TaskContext}
 
 private[hive]
 case class InsertIntoHiveTable(
@@ -101,15 +100,17 @@ case class InsertIntoHiveTable(
 
       writerContainer.executorSideSetup(context.stageId, context.partitionId, context.attemptNumber)
 
+      val proj = FromUnsafeProjection(child.schema)
       iterator.foreach { row =>
         var i = 0
+        val safeRow = proj(row)
         while (i < fieldOIs.length) {
-          outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(row.get(i, dataTypes(i)))
+          outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(safeRow.get(i, dataTypes(i)))
           i += 1
         }
 
         writerContainer
-          .getLocalFileWriter(row, table.schema)
+          .getLocalFileWriter(safeRow, table.schema)
           .write(serializer.serialize(outputData, standardOI))
       }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0da7bd50/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index a61e162..6ccd417 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -213,7 +213,8 @@ case class ScriptTransformation(
 
     child.execute().mapPartitions { iter =>
       if (iter.hasNext) {
-        processIterator(iter)
+        val proj = UnsafeProjection.create(schema)
+        processIterator(iter).map(proj)
       } else {
         // If the input iterator has no rows then do not launch the external script.
         Iterator.empty

http://git-wip-us.apache.org/repos/asf/spark/blob/0da7bd50/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 665e87e..efbf998 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -27,7 +27,6 @@ import org.apache.parquet.hadoop.ParquetOutputCommitter
 
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql._
-import org.apache.spark.sql.execution.ConvertToUnsafe
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.test.SQLTestUtils
@@ -689,36 +688,6 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
       sqlContext.sparkContext.conf.set("spark.speculation", speculationEnabled.toString)
     }
   }
-
-  test("HadoopFsRelation produces UnsafeRow") {
-    withTempTable("test_unsafe") {
-      withTempPath { dir =>
-        val path = dir.getCanonicalPath
-        sqlContext.range(3).write.format(dataSourceName).save(path)
-        sqlContext.read
-          .format(dataSourceName)
-          .option("dataSchema", new StructType().add("id", LongType, nullable = false).json)
-          .load(path)
-          .registerTempTable("test_unsafe")
-
-        val df = sqlContext.sql(
-          """SELECT COUNT(*)
-            |FROM test_unsafe a JOIN test_unsafe b
-            |WHERE a.id = b.id
-          """.stripMargin)
-
-        val plan = df.queryExecution.executedPlan
-
-        assert(
-          plan.collect { case plan: ConvertToUnsafe => plan }.isEmpty,
-          s"""Query plan shouldn't have ${classOf[ConvertToUnsafe].getSimpleName} node(s):
-             |$plan
-           """.stripMargin)
-
-        checkAnswer(df, Row(3))
-      }
-    }
-  }
 }
 
 // This class is used to test SPARK-8578. We should not use any custom output committer when


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org