You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/01/03 07:39:52 UTC

spark git commit: Revert "Revert "[SPARK-12286][SPARK-12290][SPARK-12294][SPARK-12284][SQL] always output UnsafeRow""

Repository: spark
Updated Branches:
  refs/heads/master 513e3b092 -> 6c5bbd628


Revert "Revert "[SPARK-12286][SPARK-12290][SPARK-12294][SPARK-12284][SQL] always output UnsafeRow""

This reverts commit 44ee920fd49d35b421ae562ea99bcc8f2b98ced6.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6c5bbd62
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6c5bbd62
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6c5bbd62

Branch: refs/heads/master
Commit: 6c5bbd628aaedb6efb44c15f816fea8fb600decc
Parents: 513e3b0
Author: Reynold Xin <rx...@databricks.com>
Authored: Sat Jan 2 22:39:25 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Sat Jan 2 22:39:25 2016 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/SQLContext.scala |   3 +-
 .../apache/spark/sql/execution/Exchange.scala   |  25 +--
 .../spark/sql/execution/ExistingRDD.scala       |  15 +-
 .../org/apache/spark/sql/execution/Expand.scala |  13 +-
 .../apache/spark/sql/execution/Generate.scala   |   8 +-
 .../spark/sql/execution/LocalTableScan.scala    |  13 +-
 .../org/apache/spark/sql/execution/Sort.scala   |   4 -
 .../apache/spark/sql/execution/SparkPlan.scala  |  23 ---
 .../org/apache/spark/sql/execution/Window.scala |   8 +-
 .../aggregate/SortBasedAggregate.scala          |   4 -
 .../SortBasedAggregationIterator.scala          |   8 +-
 .../execution/aggregate/TungstenAggregate.scala |   4 -
 .../spark/sql/execution/basicOperators.scala    |  58 +------
 .../columnar/InMemoryColumnarTableScan.scala    |   8 +-
 .../joins/BroadcastNestedLoopJoin.scala         |   7 -
 .../sql/execution/joins/CartesianProduct.scala  |   4 -
 .../spark/sql/execution/joins/HashJoin.scala    |   4 -
 .../sql/execution/joins/HashOuterJoin.scala     |   4 -
 .../sql/execution/joins/HashSemiJoin.scala      |   4 -
 .../sql/execution/joins/LeftSemiJoinBNL.scala   |   3 -
 .../sql/execution/joins/SortMergeJoin.scala     |   4 -
 .../execution/joins/SortMergeOuterJoin.scala    |   4 -
 .../spark/sql/execution/local/LocalNode.scala   |  12 --
 .../execution/local/NestedLoopJoinNode.scala    |   6 +-
 .../org/apache/spark/sql/execution/python.scala |   7 +-
 .../sql/execution/rowFormatConverters.scala     | 108 ------------
 .../spark/sql/execution/ExchangeSuite.scala     |   2 +-
 .../spark/sql/execution/ExpandSuite.scala       |  54 ------
 .../execution/RowFormatConvertersSuite.scala    | 164 -------------------
 .../apache/spark/sql/execution/SortSuite.scala  |   2 +-
 .../sql/hive/execution/HiveTableScan.scala      |  16 +-
 .../hive/execution/InsertIntoHiveTable.scala    |  15 +-
 .../hive/execution/ScriptTransformation.scala   |   3 +-
 .../sql/sources/hadoopFsRelationSuites.scala    |  31 ----
 34 files changed, 74 insertions(+), 574 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6c5bbd62/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index eadf5cb..0223032 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -904,8 +904,7 @@ class SQLContext private[sql](
   @transient
   protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] {
     val batches = Seq(
-      Batch("Add exchange", Once, EnsureRequirements(self)),
-      Batch("Add row converters", Once, EnsureRowFormats)
+      Batch("Add exchange", Once, EnsureRequirements(self))
     )
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6c5bbd62/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 62cbc51..7b41619 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -28,7 +28,6 @@ import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.errors.attachTree
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.util.MutablePair
@@ -50,26 +49,14 @@ case class Exchange(
       case None => ""
     }
 
-    val simpleNodeName = if (tungstenMode) "TungstenExchange" else "Exchange"
+    val simpleNodeName = "Exchange"
     s"$simpleNodeName$extraInfo"
   }
 
-  /**
-   * Returns true iff we can support the data type, and we are not doing range partitioning.
-   */
-  private lazy val tungstenMode: Boolean = !newPartitioning.isInstanceOf[RangePartitioning]
-
   override def outputPartitioning: Partitioning = newPartitioning
 
   override def output: Seq[Attribute] = child.output
 
-  // This setting is somewhat counterintuitive:
-  // If the schema works with UnsafeRow, then we tell the planner that we don't support safe row,
-  // so the planner inserts a converter to convert data into UnsafeRow if needed.
-  override def outputsUnsafeRows: Boolean = tungstenMode
-  override def canProcessSafeRows: Boolean = !tungstenMode
-  override def canProcessUnsafeRows: Boolean = tungstenMode
-
   /**
    * Determines whether records must be defensively copied before being sent to the shuffle.
    * Several of Spark's shuffle components will buffer deserialized Java objects in memory. The
@@ -130,15 +117,7 @@ case class Exchange(
     }
   }
 
-  @transient private lazy val sparkConf = child.sqlContext.sparkContext.getConf
-
-  private val serializer: Serializer = {
-    if (tungstenMode) {
-      new UnsafeRowSerializer(child.output.size)
-    } else {
-      new SparkSqlSerializer(sparkConf)
-    }
-  }
+  private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
 
   override protected def doPrepare(): Unit = {
     // If an ExchangeCoordinator is needed, we register this Exchange operator

http://git-wip-us.apache.org/repos/asf/spark/blob/6c5bbd62/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 5c01af0..fc508bf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, GenericMutableRow}
+import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, Attribute, AttributeSet, GenericMutableRow}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
 import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation}
 import org.apache.spark.sql.types.DataType
@@ -99,10 +99,19 @@ private[sql] case class PhysicalRDD(
     rdd: RDD[InternalRow],
     override val nodeName: String,
     override val metadata: Map[String, String] = Map.empty,
-    override val outputsUnsafeRows: Boolean = false)
+    isUnsafeRow: Boolean = false)
   extends LeafNode {
 
-  protected override def doExecute(): RDD[InternalRow] = rdd
+  protected override def doExecute(): RDD[InternalRow] = {
+    if (isUnsafeRow) {
+      rdd
+    } else {
+      rdd.mapPartitionsInternal { iter =>
+        val proj = UnsafeProjection.create(schema)
+        iter.map(proj)
+      }
+    }
+  }
 
   override def simpleString: String = {
     val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value"

http://git-wip-us.apache.org/repos/asf/spark/blob/6c5bbd62/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
index 91530bd..c3683cc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
@@ -41,20 +41,11 @@ case class Expand(
   // as UNKNOWN partitioning
   override def outputPartitioning: Partitioning = UnknownPartitioning(0)
 
-  override def outputsUnsafeRows: Boolean = child.outputsUnsafeRows
-  override def canProcessUnsafeRows: Boolean = true
-  override def canProcessSafeRows: Boolean = true
-
   override def references: AttributeSet =
     AttributeSet(projections.flatten.flatMap(_.references))
 
-  private[this] val projection = {
-    if (outputsUnsafeRows) {
-      (exprs: Seq[Expression]) => UnsafeProjection.create(exprs, child.output)
-    } else {
-      (exprs: Seq[Expression]) => newMutableProjection(exprs, child.output)()
-    }
-  }
+  private[this] val projection =
+    (exprs: Seq[Expression]) => UnsafeProjection.create(exprs, child.output)
 
   protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
     child.execute().mapPartitions { iter =>

http://git-wip-us.apache.org/repos/asf/spark/blob/6c5bbd62/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
index 0c613e9..4db88a0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
@@ -64,6 +64,7 @@ case class Generate(
       child.execute().mapPartitionsInternal { iter =>
         val generatorNullRow = InternalRow.fromSeq(Seq.fill[Any](generator.elementTypes.size)(null))
         val joinedRow = new JoinedRow
+        val proj = UnsafeProjection.create(output, output)
 
         iter.flatMap { row =>
           // we should always set the left (child output)
@@ -77,13 +78,14 @@ case class Generate(
         } ++ LazyIterator(() => boundGenerator.terminate()).map { row =>
           // we leave the left side as the last element of its child output
           // keep it the same as Hive does
-          joinedRow.withRight(row)
+          proj(joinedRow.withRight(row))
         }
       }
     } else {
       child.execute().mapPartitionsInternal { iter =>
-        iter.flatMap(row => boundGenerator.eval(row)) ++
-        LazyIterator(() => boundGenerator.terminate())
+        val proj = UnsafeProjection.create(output, output)
+        (iter.flatMap(row => boundGenerator.eval(row)) ++
+          LazyIterator(() => boundGenerator.terminate())).map(proj)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/6c5bbd62/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
index ba7f628..59057bf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection}
 
 
 /**
@@ -29,15 +29,20 @@ private[sql] case class LocalTableScan(
     output: Seq[Attribute],
     rows: Seq[InternalRow]) extends LeafNode {
 
-  private lazy val rdd = sqlContext.sparkContext.parallelize(rows)
+  private val unsafeRows: Array[InternalRow] = {
+    val proj = UnsafeProjection.create(output, output)
+    rows.map(r => proj(r).copy()).toArray
+  }
+
+  private lazy val rdd = sqlContext.sparkContext.parallelize(unsafeRows)
 
   protected override def doExecute(): RDD[InternalRow] = rdd
 
   override def executeCollect(): Array[InternalRow] = {
-    rows.toArray
+    unsafeRows
   }
 
   override def executeTake(limit: Int): Array[InternalRow] = {
-    rows.take(limit).toArray
+    unsafeRows.take(limit)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6c5bbd62/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
index 24207cb..73dc8cb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
@@ -39,10 +39,6 @@ case class Sort(
     testSpillFrequency: Int = 0)
   extends UnaryNode {
 
-  override def outputsUnsafeRows: Boolean = true
-  override def canProcessUnsafeRows: Boolean = true
-  override def canProcessSafeRows: Boolean = false
-
   override def output: Seq[Attribute] = child.output
 
   override def outputOrdering: Seq[SortOrder] = sortOrder

http://git-wip-us.apache.org/repos/asf/spark/blob/6c5bbd62/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index fe9b2ad..f20f32a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -97,17 +97,6 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
   /** Specifies sort order for each partition requirements on the input data for this operator. */
   def requiredChildOrdering: Seq[Seq[SortOrder]] = Seq.fill(children.size)(Nil)
 
-  /** Specifies whether this operator outputs UnsafeRows */
-  def outputsUnsafeRows: Boolean = false
-
-  /** Specifies whether this operator is capable of processing UnsafeRows */
-  def canProcessUnsafeRows: Boolean = false
-
-  /**
-   * Specifies whether this operator is capable of processing Java-object-based Rows (i.e. rows
-   * that are not UnsafeRows).
-   */
-  def canProcessSafeRows: Boolean = true
 
   /**
    * Returns the result of this query as an RDD[InternalRow] by delegating to doExecute
@@ -115,18 +104,6 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
    * Concrete implementations of SparkPlan should override doExecute instead.
    */
   final def execute(): RDD[InternalRow] = {
-    if (children.nonEmpty) {
-      val hasUnsafeInputs = children.exists(_.outputsUnsafeRows)
-      val hasSafeInputs = children.exists(!_.outputsUnsafeRows)
-      assert(!(hasSafeInputs && hasUnsafeInputs),
-        "Child operators should output rows in the same format")
-      assert(canProcessSafeRows || canProcessUnsafeRows,
-        "Operator must be able to process at least one row format")
-      assert(!hasSafeInputs || canProcessSafeRows,
-        "Operator will receive safe rows as input but cannot process safe rows")
-      assert(!hasUnsafeInputs || canProcessUnsafeRows,
-        "Operator will receive unsafe rows as input but cannot process unsafe rows")
-    }
     RDDOperationScope.withScope(sparkContext, nodeName, false, true) {
       prepare()
       doExecute()

http://git-wip-us.apache.org/repos/asf/spark/blob/6c5bbd62/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
index c941d67..b79d93d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
@@ -100,8 +100,6 @@ case class Window(
 
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  override def canProcessUnsafeRows: Boolean = true
-
   /**
    * Create a bound ordering object for a given frame type and offset. A bound ordering object is
    * used to determine which input row lies within the frame boundaries of an output row.
@@ -259,16 +257,16 @@ case class Window(
    * @return the final resulting projection.
    */
   private[this] def createResultProjection(
-      expressions: Seq[Expression]): MutableProjection = {
+      expressions: Seq[Expression]): UnsafeProjection = {
     val references = expressions.zipWithIndex.map{ case (e, i) =>
       // Results of window expressions will be on the right side of child's output
       BoundReference(child.output.size + i, e.dataType, e.nullable)
     }
     val unboundToRefMap = expressions.zip(references).toMap
     val patchedWindowExpression = windowExpression.map(_.transform(unboundToRefMap))
-    newMutableProjection(
+    UnsafeProjection.create(
       projectList ++ patchedWindowExpression,
-      child.output)()
+      child.output)
   }
 
   protected override def doExecute(): RDD[InternalRow] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/6c5bbd62/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala
index c4587ba..01d0766 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala
@@ -49,10 +49,6 @@ case class SortBasedAggregate(
     "numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of input rows"),
     "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
 
-  override def outputsUnsafeRows: Boolean = true
-  override def canProcessUnsafeRows: Boolean = false
-  override def canProcessSafeRows: Boolean = true
-
   override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute)
 
   override def requiredChildDistribution: List[Distribution] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/6c5bbd62/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
index ac920aa..6501634 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
@@ -87,6 +87,10 @@ class SortBasedAggregationIterator(
   // The aggregation buffer used by the sort-based aggregation.
   private[this] val sortBasedAggregationBuffer: MutableRow = newBuffer
 
+  // An SafeProjection to turn UnsafeRow into GenericInternalRow, because UnsafeRow can't be
+  // compared to MutableRow (aggregation buffer) directly.
+  private[this] val safeProj: Projection = FromUnsafeProjection(valueAttributes.map(_.dataType))
+
   protected def initialize(): Unit = {
     if (inputIterator.hasNext) {
       initializeBuffer(sortBasedAggregationBuffer)
@@ -110,7 +114,7 @@ class SortBasedAggregationIterator(
     // We create a variable to track if we see the next group.
     var findNextPartition = false
     // firstRowInNextGroup is the first row of this group. We first process it.
-    processRow(sortBasedAggregationBuffer, firstRowInNextGroup)
+    processRow(sortBasedAggregationBuffer, safeProj(firstRowInNextGroup))
 
     // The search will stop when we see the next group or there is no
     // input row left in the iter.
@@ -122,7 +126,7 @@ class SortBasedAggregationIterator(
 
       // Check if the current row belongs the current input row.
       if (currentGroupingKey == groupingKey) {
-        processRow(sortBasedAggregationBuffer, currentRow)
+        processRow(sortBasedAggregationBuffer, safeProj(currentRow))
       } else {
         // We find a new group.
         findNextPartition = true

http://git-wip-us.apache.org/repos/asf/spark/blob/6c5bbd62/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
index 9d758eb..999ebb7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
@@ -49,10 +49,6 @@ case class TungstenAggregate(
     "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
     "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"))
 
-  override def outputsUnsafeRows: Boolean = true
-  override def canProcessUnsafeRows: Boolean = true
-  override def canProcessSafeRows: Boolean = true
-
   override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute)
 
   override def producedAttributes: AttributeSet =

http://git-wip-us.apache.org/repos/asf/spark/blob/6c5bbd62/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index f19d72f..af7237e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -36,10 +36,6 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends
   override private[sql] lazy val metrics = Map(
     "numRows" -> SQLMetrics.createLongMetric(sparkContext, "number of rows"))
 
-  override def outputsUnsafeRows: Boolean = true
-  override def canProcessUnsafeRows: Boolean = true
-  override def canProcessSafeRows: Boolean = true
-
   override def output: Seq[Attribute] = projectList.map(_.toAttribute)
 
   protected override def doExecute(): RDD[InternalRow] = {
@@ -80,12 +76,6 @@ case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode {
   }
 
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
-
-  override def outputsUnsafeRows: Boolean = child.outputsUnsafeRows
-
-  override def canProcessUnsafeRows: Boolean = true
-
-  override def canProcessSafeRows: Boolean = true
 }
 
 /**
@@ -108,10 +98,6 @@ case class Sample(
 {
   override def output: Seq[Attribute] = child.output
 
-  override def outputsUnsafeRows: Boolean = child.outputsUnsafeRows
-  override def canProcessUnsafeRows: Boolean = true
-  override def canProcessSafeRows: Boolean = true
-
   protected override def doExecute(): RDD[InternalRow] = {
     if (withReplacement) {
       // Disable gap sampling since the gap sampling method buffers two rows internally,
@@ -135,8 +121,6 @@ case class Range(
     output: Seq[Attribute])
   extends LeafNode {
 
-  override def outputsUnsafeRows: Boolean = true
-
   protected override def doExecute(): RDD[InternalRow] = {
     sqlContext
       .sparkContext
@@ -199,9 +183,6 @@ case class Union(children: Seq[SparkPlan]) extends SparkPlan {
       }
     }
   }
-  override def outputsUnsafeRows: Boolean = children.exists(_.outputsUnsafeRows)
-  override def canProcessUnsafeRows: Boolean = true
-  override def canProcessSafeRows: Boolean = true
   protected override def doExecute(): RDD[InternalRow] =
     sparkContext.union(children.map(_.execute()))
 }
@@ -268,12 +249,14 @@ case class TakeOrderedAndProject(
   // and this ordering needs to be created on the driver in order to be passed into Spark core code.
   private val ord: InterpretedOrdering = new InterpretedOrdering(sortOrder, child.output)
 
-  // TODO: remove @transient after figure out how to clean closure at InsertIntoHiveTable.
-  @transient private val projection = projectList.map(new InterpretedProjection(_, child.output))
-
   private def collectData(): Array[InternalRow] = {
     val data = child.execute().map(_.copy()).takeOrdered(limit)(ord)
-    projection.map(data.map(_)).getOrElse(data)
+    if (projectList.isDefined) {
+      val proj = UnsafeProjection.create(projectList.get, child.output)
+      data.map(r => proj(r).copy())
+    } else {
+      data
+    }
   }
 
   override def executeCollect(): Array[InternalRow] = {
@@ -311,10 +294,6 @@ case class Coalesce(numPartitions: Int, child: SparkPlan) extends UnaryNode {
   protected override def doExecute(): RDD[InternalRow] = {
     child.execute().coalesce(numPartitions, shuffle = false)
   }
-
-  override def outputsUnsafeRows: Boolean = child.outputsUnsafeRows
-  override def canProcessUnsafeRows: Boolean = true
-  override def canProcessSafeRows: Boolean = true
 }
 
 /**
@@ -327,10 +306,6 @@ case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode {
   protected override def doExecute(): RDD[InternalRow] = {
     left.execute().map(_.copy()).subtract(right.execute().map(_.copy()))
   }
-
-  override def outputsUnsafeRows: Boolean = children.exists(_.outputsUnsafeRows)
-  override def canProcessUnsafeRows: Boolean = true
-  override def canProcessSafeRows: Boolean = true
 }
 
 /**
@@ -343,10 +318,6 @@ case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode {
   protected override def doExecute(): RDD[InternalRow] = {
     left.execute().map(_.copy()).intersection(right.execute().map(_.copy()))
   }
-
-  override def outputsUnsafeRows: Boolean = children.exists(_.outputsUnsafeRows)
-  override def canProcessUnsafeRows: Boolean = true
-  override def canProcessSafeRows: Boolean = true
 }
 
 /**
@@ -371,10 +342,6 @@ case class MapPartitions[T, U](
     child: SparkPlan) extends UnaryNode {
   override def producedAttributes: AttributeSet = outputSet
 
-  override def canProcessSafeRows: Boolean = true
-  override def canProcessUnsafeRows: Boolean = true
-  override def outputsUnsafeRows: Boolean = true
-
   override protected def doExecute(): RDD[InternalRow] = {
     child.execute().mapPartitionsInternal { iter =>
       val tBoundEncoder = tEncoder.bind(child.output)
@@ -394,11 +361,6 @@ case class AppendColumns[T, U](
     child: SparkPlan) extends UnaryNode {
   override def producedAttributes: AttributeSet = AttributeSet(newColumns)
 
-  // We are using an unsafe combiner.
-  override def canProcessSafeRows: Boolean = false
-  override def canProcessUnsafeRows: Boolean = true
-  override def outputsUnsafeRows: Boolean = true
-
   override def output: Seq[Attribute] = child.output ++ newColumns
 
   override protected def doExecute(): RDD[InternalRow] = {
@@ -428,10 +390,6 @@ case class MapGroups[K, T, U](
     child: SparkPlan) extends UnaryNode {
   override def producedAttributes: AttributeSet = outputSet
 
-  override def canProcessSafeRows: Boolean = true
-  override def canProcessUnsafeRows: Boolean = true
-  override def outputsUnsafeRows: Boolean = true
-
   override def requiredChildDistribution: Seq[Distribution] =
     ClusteredDistribution(groupingAttributes) :: Nil
 
@@ -472,10 +430,6 @@ case class CoGroup[Key, Left, Right, Result](
     right: SparkPlan) extends BinaryNode {
   override def producedAttributes: AttributeSet = outputSet
 
-  override def canProcessSafeRows: Boolean = true
-  override def canProcessUnsafeRows: Boolean = true
-  override def outputsUnsafeRows: Boolean = true
-
   override def requiredChildDistribution: Seq[Distribution] =
     ClusteredDistribution(leftGroup) :: ClusteredDistribution(rightGroup) :: Nil
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6c5bbd62/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala
index aa7a668..d809123 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
-import org.apache.spark.sql.execution.{ConvertToUnsafe, LeafNode, SparkPlan}
+import org.apache.spark.sql.execution.{LeafNode, SparkPlan}
 import org.apache.spark.sql.types.UserDefinedType
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.{Accumulable, Accumulator, Accumulators}
@@ -39,9 +39,7 @@ private[sql] object InMemoryRelation {
       storageLevel: StorageLevel,
       child: SparkPlan,
       tableName: Option[String]): InMemoryRelation =
-    new InMemoryRelation(child.output, useCompression, batchSize, storageLevel,
-      if (child.outputsUnsafeRows) child else ConvertToUnsafe(child),
-      tableName)()
+    new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child, tableName)()
 }
 
 /**
@@ -226,8 +224,6 @@ private[sql] case class InMemoryColumnarTableScan(
   // The cached version does not change the outputOrdering of the original SparkPlan.
   override def outputOrdering: Seq[SortOrder] = relation.child.outputOrdering
 
-  override def outputsUnsafeRows: Boolean = true
-
   private def statsFor(a: Attribute) = relation.partitionStatistics.forAttribute(a)
 
   // Returned filter predicate should return false iff it is impossible for the input expression

http://git-wip-us.apache.org/repos/asf/spark/blob/6c5bbd62/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
index aab177b..54275c2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
@@ -46,15 +46,8 @@ case class BroadcastNestedLoopJoin(
     case BuildLeft => (right, left)
   }
 
-  override def outputsUnsafeRows: Boolean = left.outputsUnsafeRows || right.outputsUnsafeRows
-  override def canProcessUnsafeRows: Boolean = true
-
   private[this] def genResultProjection: InternalRow => InternalRow = {
-    if (outputsUnsafeRows) {
       UnsafeProjection.create(schema)
-    } else {
-      identity[InternalRow]
-    }
   }
 
   override def outputPartitioning: Partitioning = streamed.outputPartitioning

http://git-wip-us.apache.org/repos/asf/spark/blob/6c5bbd62/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
index 81bfe4e..d9fa4c6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
@@ -81,10 +81,6 @@ class UnsafeCartesianRDD(left : RDD[UnsafeRow], right : RDD[UnsafeRow], numField
 case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNode {
   override def output: Seq[Attribute] = left.output ++ right.output
 
-  override def canProcessSafeRows: Boolean = false
-  override def canProcessUnsafeRows: Boolean = true
-  override def outputsUnsafeRows: Boolean = true
-
   override private[sql] lazy val metrics = Map(
     "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
     "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),

http://git-wip-us.apache.org/repos/asf/spark/blob/6c5bbd62/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index fb961d9..7f9d9da 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -44,10 +44,6 @@ trait HashJoin {
 
   override def output: Seq[Attribute] = left.output ++ right.output
 
-  override def outputsUnsafeRows: Boolean = true
-  override def canProcessUnsafeRows: Boolean = true
-  override def canProcessSafeRows: Boolean = false
-
   protected def buildSideKeyGenerator: Projection =
     UnsafeProjection.create(buildKeys, buildPlan.output)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6c5bbd62/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
index c6e5868..6d464d6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
@@ -64,10 +64,6 @@ trait HashOuterJoin {
         s"HashOuterJoin should not take $x as the JoinType")
   }
 
-  override def outputsUnsafeRows: Boolean = true
-  override def canProcessUnsafeRows: Boolean = true
-  override def canProcessSafeRows: Boolean = false
-
   protected def buildKeyGenerator: Projection =
     UnsafeProjection.create(buildKeys, buildPlan.output)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6c5bbd62/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
index f23a183..3e0f74c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
@@ -33,10 +33,6 @@ trait HashSemiJoin {
 
   override def output: Seq[Attribute] = left.output
 
-  override def outputsUnsafeRows: Boolean = true
-  override def canProcessUnsafeRows: Boolean = true
-  override def canProcessSafeRows: Boolean = false
-
   protected def leftKeyGenerator: Projection =
     UnsafeProjection.create(leftKeys, left.output)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6c5bbd62/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
index efa7b49..82498ee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
@@ -42,9 +42,6 @@ case class LeftSemiJoinBNL(
 
   override def output: Seq[Attribute] = left.output
 
-  override def outputsUnsafeRows: Boolean = streamed.outputsUnsafeRows
-  override def canProcessUnsafeRows: Boolean = true
-
   /** The Streamed Relation */
   override def left: SparkPlan = streamed
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6c5bbd62/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
index 4bf7b52..812f881 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
@@ -53,10 +53,6 @@ case class SortMergeJoin(
   override def requiredChildOrdering: Seq[Seq[SortOrder]] =
     requiredOrders(leftKeys) :: requiredOrders(rightKeys) :: Nil
 
-  override def outputsUnsafeRows: Boolean = true
-  override def canProcessUnsafeRows: Boolean = true
-  override def canProcessSafeRows: Boolean = false
-
   private def requiredOrders(keys: Seq[Expression]): Seq[SortOrder] = {
     // This must be ascending in order to agree with the `keyOrdering` defined in `doExecute()`.
     keys.map(SortOrder(_, Ascending))

http://git-wip-us.apache.org/repos/asf/spark/blob/6c5bbd62/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala
index 7ce38eb..c3a2bfc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala
@@ -89,10 +89,6 @@ case class SortMergeOuterJoin(
     keys.map(SortOrder(_, Ascending))
   }
 
-  override def outputsUnsafeRows: Boolean = true
-  override def canProcessUnsafeRows: Boolean = true
-  override def canProcessSafeRows: Boolean = false
-
   private def createLeftKeyGenerator(): Projection =
     UnsafeProjection.create(leftKeys, left.output)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6c5bbd62/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
index 6a882c9..e462170 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
@@ -69,18 +69,6 @@ abstract class LocalNode(conf: SQLConf) extends QueryPlan[LocalNode] with Loggin
    */
   def close(): Unit
 
-  /** Specifies whether this operator outputs UnsafeRows */
-  def outputsUnsafeRows: Boolean = false
-
-  /** Specifies whether this operator is capable of processing UnsafeRows */
-  def canProcessUnsafeRows: Boolean = false
-
-  /**
-   * Specifies whether this operator is capable of processing Java-object-based Rows (i.e. rows
-   * that are not UnsafeRows).
-   */
-  def canProcessSafeRows: Boolean = true
-
   /**
    * Returns the content through the [[Iterator]] interface.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/6c5bbd62/sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala
index 7321fc6..b7fa0c0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala
@@ -47,11 +47,7 @@ case class NestedLoopJoinNode(
   }
 
   private[this] def genResultProjection: InternalRow => InternalRow = {
-    if (outputsUnsafeRows) {
-      UnsafeProjection.create(schema)
-    } else {
-      identity[InternalRow]
-    }
+    UnsafeProjection.create(schema)
   }
 
   private[this] var currentRow: InternalRow = _

http://git-wip-us.apache.org/repos/asf/spark/blob/6c5bbd62/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala
index defcec9..efb4b09 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala
@@ -351,10 +351,6 @@ case class BatchPythonEvaluation(udf: PythonUDF, output: Seq[Attribute], child:
 
   def children: Seq[SparkPlan] = child :: Nil
 
-  override def outputsUnsafeRows: Boolean = false
-  override def canProcessUnsafeRows: Boolean = true
-  override def canProcessSafeRows: Boolean = true
-
   protected override def doExecute(): RDD[InternalRow] = {
     val inputRDD = child.execute().map(_.copy())
     val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536)
@@ -400,13 +396,14 @@ case class BatchPythonEvaluation(udf: PythonUDF, output: Seq[Attribute], child:
       val unpickle = new Unpickler
       val row = new GenericMutableRow(1)
       val joined = new JoinedRow
+      val resultProj = UnsafeProjection.create(output, output)
 
       outputIterator.flatMap { pickedResult =>
         val unpickledBatch = unpickle.loads(pickedResult)
         unpickledBatch.asInstanceOf[java.util.ArrayList[Any]].asScala
       }.map { result =>
         row(0) = EvaluatePython.fromJava(result, udf.dataType)
-        joined(queue.poll(), row)
+        resultProj(joined(queue.poll(), row))
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/6c5bbd62/sql/core/src/main/scala/org/apache/spark/sql/execution/rowFormatConverters.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/rowFormatConverters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/rowFormatConverters.scala
deleted file mode 100644
index 5f8fc2d..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/rowFormatConverters.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical.Partitioning
-import org.apache.spark.sql.catalyst.rules.Rule
-
-/**
- * Converts Java-object-based rows into [[UnsafeRow]]s.
- */
-case class ConvertToUnsafe(child: SparkPlan) extends UnaryNode {
-
-  override def output: Seq[Attribute] = child.output
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
-  override def outputsUnsafeRows: Boolean = true
-  override def canProcessUnsafeRows: Boolean = false
-  override def canProcessSafeRows: Boolean = true
-  override protected def doExecute(): RDD[InternalRow] = {
-    child.execute().mapPartitions { iter =>
-      val convertToUnsafe = UnsafeProjection.create(child.schema)
-      iter.map(convertToUnsafe)
-    }
-  }
-}
-
-/**
- * Converts [[UnsafeRow]]s back into Java-object-based rows.
- */
-case class ConvertToSafe(child: SparkPlan) extends UnaryNode {
-  override def output: Seq[Attribute] = child.output
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
-  override def outputsUnsafeRows: Boolean = false
-  override def canProcessUnsafeRows: Boolean = true
-  override def canProcessSafeRows: Boolean = false
-  override protected def doExecute(): RDD[InternalRow] = {
-    child.execute().mapPartitions { iter =>
-      val convertToSafe = FromUnsafeProjection(child.output.map(_.dataType))
-      iter.map(convertToSafe)
-    }
-  }
-}
-
-private[sql] object EnsureRowFormats extends Rule[SparkPlan] {
-
-  private def onlyHandlesSafeRows(operator: SparkPlan): Boolean =
-    operator.canProcessSafeRows && !operator.canProcessUnsafeRows
-
-  private def onlyHandlesUnsafeRows(operator: SparkPlan): Boolean =
-    operator.canProcessUnsafeRows && !operator.canProcessSafeRows
-
-  private def handlesBothSafeAndUnsafeRows(operator: SparkPlan): Boolean =
-    operator.canProcessSafeRows && operator.canProcessUnsafeRows
-
-  override def apply(operator: SparkPlan): SparkPlan = operator.transformUp {
-    case operator: SparkPlan if onlyHandlesSafeRows(operator) =>
-      if (operator.children.exists(_.outputsUnsafeRows)) {
-        operator.withNewChildren {
-          operator.children.map {
-            c => if (c.outputsUnsafeRows) ConvertToSafe(c) else c
-          }
-        }
-      } else {
-        operator
-      }
-    case operator: SparkPlan if onlyHandlesUnsafeRows(operator) =>
-      if (operator.children.exists(!_.outputsUnsafeRows)) {
-        operator.withNewChildren {
-          operator.children.map {
-            c => if (!c.outputsUnsafeRows) ConvertToUnsafe(c) else c
-          }
-        }
-      } else {
-        operator
-      }
-    case operator: SparkPlan if handlesBothSafeAndUnsafeRows(operator) =>
-      if (operator.children.map(_.outputsUnsafeRows).toSet.size != 1) {
-        // If this operator's children produce both unsafe and safe rows,
-        // convert everything unsafe rows.
-        operator.withNewChildren {
-          operator.children.map {
-            c => if (!c.outputsUnsafeRows) ConvertToUnsafe(c) else c
-          }
-        }
-      } else {
-        operator
-      }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/6c5bbd62/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala
index 911d12e..87bff32 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala
@@ -28,7 +28,7 @@ class ExchangeSuite extends SparkPlanTest with SharedSQLContext {
     val input = (1 to 1000).map(Tuple1.apply)
     checkAnswer(
       input.toDF(),
-      plan => ConvertToSafe(Exchange(SinglePartition, ConvertToUnsafe(plan))),
+      plan => Exchange(SinglePartition, plan),
       input.map(Row.fromTuple)
     )
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/6c5bbd62/sql/core/src/test/scala/org/apache/spark/sql/execution/ExpandSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExpandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExpandSuite.scala
deleted file mode 100644
index faef76d..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExpandSuite.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, Alias, Literal}
-import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types.IntegerType
-
-class ExpandSuite extends SparkPlanTest with SharedSQLContext {
-  import testImplicits.localSeqToDataFrameHolder
-
-  private def testExpand(f: SparkPlan => SparkPlan): Unit = {
-    val input = (1 to 1000).map(Tuple1.apply)
-    val projections = Seq.tabulate(2) { i =>
-      Alias(BoundReference(0, IntegerType, false), "id")() :: Alias(Literal(i), "gid")() :: Nil
-    }
-    val attributes = projections.head.map(_.toAttribute)
-    checkAnswer(
-      input.toDF(),
-      plan => Expand(projections, attributes, f(plan)),
-      input.flatMap(i => Seq.tabulate(2)(j => Row(i._1, j)))
-    )
-  }
-
-  test("inheriting child row type") {
-    val exprs = AttributeReference("a", IntegerType, false)() :: Nil
-    val plan = Expand(Seq(exprs), exprs, ConvertToUnsafe(LocalTableScan(exprs, Seq.empty)))
-    assert(plan.outputsUnsafeRows, "Expand should inherits the created row type from its child.")
-  }
-
-  test("expanding UnsafeRows") {
-    testExpand(ConvertToUnsafe)
-  }
-
-  test("expanding SafeRows") {
-    testExpand(identity)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/6c5bbd62/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
deleted file mode 100644
index 2328899..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{SQLContext, Row}
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Attribute, Literal, IsNull}
-import org.apache.spark.sql.catalyst.util.GenericArrayData
-import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types.{ArrayType, StringType}
-import org.apache.spark.unsafe.types.UTF8String
-
-class RowFormatConvertersSuite extends SparkPlanTest with SharedSQLContext {
-
-  private def getConverters(plan: SparkPlan): Seq[SparkPlan] = plan.collect {
-    case c: ConvertToUnsafe => c
-    case c: ConvertToSafe => c
-  }
-
-  private val outputsSafe = ReferenceSort(Nil, false, PhysicalRDD(Seq.empty, null, "name"))
-  assert(!outputsSafe.outputsUnsafeRows)
-  private val outputsUnsafe = Sort(Nil, false, PhysicalRDD(Seq.empty, null, "name"))
-  assert(outputsUnsafe.outputsUnsafeRows)
-
-  test("planner should insert unsafe->safe conversions when required") {
-    val plan = Limit(10, outputsUnsafe)
-    val preparedPlan = sqlContext.prepareForExecution.execute(plan)
-    assert(preparedPlan.children.head.isInstanceOf[ConvertToSafe])
-  }
-
-  test("filter can process unsafe rows") {
-    val plan = Filter(IsNull(IsNull(Literal(1))), outputsUnsafe)
-    val preparedPlan = sqlContext.prepareForExecution.execute(plan)
-    assert(getConverters(preparedPlan).size === 1)
-    assert(preparedPlan.outputsUnsafeRows)
-  }
-
-  test("filter can process safe rows") {
-    val plan = Filter(IsNull(IsNull(Literal(1))), outputsSafe)
-    val preparedPlan = sqlContext.prepareForExecution.execute(plan)
-    assert(getConverters(preparedPlan).isEmpty)
-    assert(!preparedPlan.outputsUnsafeRows)
-  }
-
-  test("coalesce can process unsafe rows") {
-    val plan = Coalesce(1, outputsUnsafe)
-    val preparedPlan = sqlContext.prepareForExecution.execute(plan)
-    assert(getConverters(preparedPlan).size === 1)
-    assert(preparedPlan.outputsUnsafeRows)
-  }
-
-  test("except can process unsafe rows") {
-    val plan = Except(outputsUnsafe, outputsUnsafe)
-    val preparedPlan = sqlContext.prepareForExecution.execute(plan)
-    assert(getConverters(preparedPlan).size === 2)
-    assert(preparedPlan.outputsUnsafeRows)
-  }
-
-  test("except requires all of its input rows' formats to agree") {
-    val plan = Except(outputsSafe, outputsUnsafe)
-    assert(plan.canProcessSafeRows && plan.canProcessUnsafeRows)
-    val preparedPlan = sqlContext.prepareForExecution.execute(plan)
-    assert(preparedPlan.outputsUnsafeRows)
-  }
-
-  test("intersect can process unsafe rows") {
-    val plan = Intersect(outputsUnsafe, outputsUnsafe)
-    val preparedPlan = sqlContext.prepareForExecution.execute(plan)
-    assert(getConverters(preparedPlan).size === 2)
-    assert(preparedPlan.outputsUnsafeRows)
-  }
-
-  test("intersect requires all of its input rows' formats to agree") {
-    val plan = Intersect(outputsSafe, outputsUnsafe)
-    assert(plan.canProcessSafeRows && plan.canProcessUnsafeRows)
-    val preparedPlan = sqlContext.prepareForExecution.execute(plan)
-    assert(preparedPlan.outputsUnsafeRows)
-  }
-
-  test("execute() fails an assertion if inputs rows are of different formats") {
-    val e = intercept[AssertionError] {
-      Union(Seq(outputsSafe, outputsUnsafe)).execute()
-    }
-    assert(e.getMessage.contains("format"))
-  }
-
-  test("union requires all of its input rows' formats to agree") {
-    val plan = Union(Seq(outputsSafe, outputsUnsafe))
-    assert(plan.canProcessSafeRows && plan.canProcessUnsafeRows)
-    val preparedPlan = sqlContext.prepareForExecution.execute(plan)
-    assert(preparedPlan.outputsUnsafeRows)
-  }
-
-  test("union can process safe rows") {
-    val plan = Union(Seq(outputsSafe, outputsSafe))
-    val preparedPlan = sqlContext.prepareForExecution.execute(plan)
-    assert(!preparedPlan.outputsUnsafeRows)
-  }
-
-  test("union can process unsafe rows") {
-    val plan = Union(Seq(outputsUnsafe, outputsUnsafe))
-    val preparedPlan = sqlContext.prepareForExecution.execute(plan)
-    assert(preparedPlan.outputsUnsafeRows)
-  }
-
-  test("round trip with ConvertToUnsafe and ConvertToSafe") {
-    val input = Seq(("hello", 1), ("world", 2))
-    checkAnswer(
-      sqlContext.createDataFrame(input),
-      plan => ConvertToSafe(ConvertToUnsafe(plan)),
-      input.map(Row.fromTuple)
-    )
-  }
-
-  test("SPARK-9683: copy UTF8String when convert unsafe array/map to safe") {
-    SQLContext.setActive(sqlContext)
-    val schema = ArrayType(StringType)
-    val rows = (1 to 100).map { i =>
-      InternalRow(new GenericArrayData(Array[Any](UTF8String.fromString(i.toString))))
-    }
-    val relation = LocalTableScan(Seq(AttributeReference("t", schema)()), rows)
-
-    val plan =
-      DummyPlan(
-        ConvertToSafe(
-          ConvertToUnsafe(relation)))
-    assert(plan.execute().collect().map(_.getUTF8String(0).toString) === (1 to 100).map(_.toString))
-  }
-}
-
-case class DummyPlan(child: SparkPlan) extends UnaryNode {
-
-  override protected def doExecute(): RDD[InternalRow] = {
-    child.execute().mapPartitions { iter =>
-      // This `DummyPlan` is in safe mode, so we don't need to do copy even we hold some
-      // values gotten from the incoming rows.
-      // we cache all strings here to make sure we have deep copied UTF8String inside incoming
-      // safe InternalRow.
-      val strings = new scala.collection.mutable.ArrayBuffer[UTF8String]
-      iter.foreach { row =>
-        strings += row.getArray(0).getUTF8String(0)
-      }
-      strings.map(InternalRow(_)).iterator
-    }
-  }
-
-  override def output: Seq[Attribute] = Seq(AttributeReference("a", StringType)())
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/6c5bbd62/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
index e5d34be..af971df 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
@@ -99,7 +99,7 @@ class SortSuite extends SparkPlanTest with SharedSQLContext {
       )
       checkThatPlansAgree(
         inputDf,
-        p => ConvertToSafe(Sort(sortOrder, global = true, p: SparkPlan, testSpillFrequency = 23)),
+        p => Sort(sortOrder, global = true, p: SparkPlan, testSpillFrequency = 23),
         ReferenceSort(sortOrder, global = true, _: SparkPlan),
         sortAnswers = false
       )

http://git-wip-us.apache.org/repos/asf/spark/blob/6c5bbd62/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
index 8141136..1588728 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
@@ -132,11 +132,17 @@ case class HiveTableScan(
     }
   }
 
-  protected override def doExecute(): RDD[InternalRow] = if (!relation.hiveQlTable.isPartitioned) {
-    hadoopReader.makeRDDForTable(relation.hiveQlTable)
-  } else {
-    hadoopReader.makeRDDForPartitionedTable(
-      prunePartitions(relation.getHiveQlPartitions(partitionPruningPred)))
+  protected override def doExecute(): RDD[InternalRow] = {
+    val rdd = if (!relation.hiveQlTable.isPartitioned) {
+      hadoopReader.makeRDDForTable(relation.hiveQlTable)
+    } else {
+      hadoopReader.makeRDDForPartitionedTable(
+        prunePartitions(relation.getHiveQlPartitions(partitionPruningPred)))
+    }
+    rdd.mapPartitionsInternal { iter =>
+      val proj = UnsafeProjection.create(schema)
+      iter.map(proj)
+    }
   }
 
   override def output: Seq[Attribute] = attributes

http://git-wip-us.apache.org/repos/asf/spark/blob/6c5bbd62/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index f936cf5..44dc68e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -28,18 +28,17 @@ import org.apache.hadoop.hive.ql.{Context, ErrorMsg}
 import org.apache.hadoop.hive.serde2.Serializer
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
 import org.apache.hadoop.hive.serde2.objectinspector._
-import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf}
+import org.apache.hadoop.mapred.{FileOutputFormat, JobConf}
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.execution.{UnaryNode, SparkPlan}
+import org.apache.spark.sql.catalyst.expressions.{FromUnsafeProjection, Attribute}
+import org.apache.spark.sql.execution.{SparkPlan, UnaryNode}
 import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
 import org.apache.spark.sql.hive._
 import org.apache.spark.sql.types.DataType
-import org.apache.spark.{SparkException, TaskContext}
 import org.apache.spark.util.SerializableJobConf
+import org.apache.spark.{SparkException, TaskContext}
 
 private[hive]
 case class InsertIntoHiveTable(
@@ -101,15 +100,17 @@ case class InsertIntoHiveTable(
 
       writerContainer.executorSideSetup(context.stageId, context.partitionId, context.attemptNumber)
 
+      val proj = FromUnsafeProjection(child.schema)
       iterator.foreach { row =>
         var i = 0
+        val safeRow = proj(row)
         while (i < fieldOIs.length) {
-          outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(row.get(i, dataTypes(i)))
+          outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(safeRow.get(i, dataTypes(i)))
           i += 1
         }
 
         writerContainer
-          .getLocalFileWriter(row, table.schema)
+          .getLocalFileWriter(safeRow, table.schema)
           .write(serializer.serialize(outputData, standardOI))
       }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6c5bbd62/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index a61e162..6ccd417 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -213,7 +213,8 @@ case class ScriptTransformation(
 
     child.execute().mapPartitions { iter =>
       if (iter.hasNext) {
-        processIterator(iter)
+        val proj = UnsafeProjection.create(schema)
+        processIterator(iter).map(proj)
       } else {
         // If the input iterator has no rows then do not launch the external script.
         Iterator.empty

http://git-wip-us.apache.org/repos/asf/spark/blob/6c5bbd62/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 665e87e..efbf998 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -27,7 +27,6 @@ import org.apache.parquet.hadoop.ParquetOutputCommitter
 
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql._
-import org.apache.spark.sql.execution.ConvertToUnsafe
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.test.SQLTestUtils
@@ -689,36 +688,6 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
       sqlContext.sparkContext.conf.set("spark.speculation", speculationEnabled.toString)
     }
   }
-
-  test("HadoopFsRelation produces UnsafeRow") {
-    withTempTable("test_unsafe") {
-      withTempPath { dir =>
-        val path = dir.getCanonicalPath
-        sqlContext.range(3).write.format(dataSourceName).save(path)
-        sqlContext.read
-          .format(dataSourceName)
-          .option("dataSchema", new StructType().add("id", LongType, nullable = false).json)
-          .load(path)
-          .registerTempTable("test_unsafe")
-
-        val df = sqlContext.sql(
-          """SELECT COUNT(*)
-            |FROM test_unsafe a JOIN test_unsafe b
-            |WHERE a.id = b.id
-          """.stripMargin)
-
-        val plan = df.queryExecution.executedPlan
-
-        assert(
-          plan.collect { case plan: ConvertToUnsafe => plan }.isEmpty,
-          s"""Query plan shouldn't have ${classOf[ConvertToUnsafe].getSimpleName} node(s):
-             |$plan
-           """.stripMargin)
-
-        checkAnswer(df, Row(3))
-      }
-    }
-  }
 }
 
 // This class is used to test SPARK-8578. We should not use any custom output committer when


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org