You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2016/02/25 16:08:17 UTC

spark git commit: [SPARK-13457][SQL] Removes DataFrame RDD operations

Repository: spark
Updated Branches:
  refs/heads/master 4460113d4 -> 157fe64f3


[SPARK-13457][SQL] Removes DataFrame RDD operations

## What changes were proposed in this pull request?

This PR removes DataFrame RDD operations. Original calls are now replaced by calls to methods of `DataFrame.rdd`.

## How was the this patch tested?

No extra tests are added. Existing tests should do the work.

Author: Cheng Lian <li...@databricks.com>

Closes #11323 from liancheng/remove-df-rdd-ops.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/157fe64f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/157fe64f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/157fe64f

Branch: refs/heads/master
Commit: 157fe64f3ecbd13b7286560286e50235eecfe30e
Parents: 4460113
Author: Cheng Lian <li...@databricks.com>
Authored: Thu Feb 25 23:07:59 2016 +0800
Committer: Cheng Lian <li...@databricks.com>
Committed: Thu Feb 25 23:07:59 2016 +0800

----------------------------------------------------------------------
 .../spark/examples/ml/DataFrameExample.scala    |  2 +-
 .../spark/examples/ml/DecisionTreeExample.scala |  8 ++--
 .../spark/examples/ml/OneVsRestExample.scala    |  2 +-
 .../spark/examples/mllib/LDAExample.scala       |  1 +
 .../apache/spark/examples/sql/RDDRelation.scala |  2 +-
 .../spark/examples/sql/hive/HiveFromSpark.scala |  2 +-
 .../scala/org/apache/spark/ml/Predictor.scala   |  6 ++-
 .../ml/classification/LogisticRegression.scala  | 13 +++---
 .../spark/ml/clustering/BisectingKMeans.scala   |  4 +-
 .../org/apache/spark/ml/clustering/KMeans.scala |  6 +--
 .../org/apache/spark/ml/clustering/LDA.scala    |  1 +
 .../BinaryClassificationEvaluator.scala         |  9 ++---
 .../MulticlassClassificationEvaluator.scala     |  6 +--
 .../ml/evaluation/RegressionEvaluator.scala     |  3 +-
 .../apache/spark/ml/feature/ChiSqSelector.scala |  2 +-
 .../spark/ml/feature/CountVectorizer.scala      |  2 +-
 .../scala/org/apache/spark/ml/feature/IDF.scala |  2 +-
 .../apache/spark/ml/feature/MinMaxScaler.scala  |  2 +-
 .../apache/spark/ml/feature/OneHotEncoder.scala |  2 +-
 .../scala/org/apache/spark/ml/feature/PCA.scala |  2 +-
 .../spark/ml/feature/StandardScaler.scala       |  2 +-
 .../apache/spark/ml/feature/StringIndexer.scala |  1 +
 .../apache/spark/ml/feature/VectorIndexer.scala |  2 +-
 .../org/apache/spark/ml/feature/Word2Vec.scala  |  2 +-
 .../apache/spark/ml/recommendation/ALS.scala    |  1 +
 .../ml/regression/AFTSurvivalRegression.scala   |  2 +-
 .../ml/regression/IsotonicRegression.scala      |  6 +--
 .../spark/ml/regression/LinearRegression.scala  | 16 +++++---
 .../spark/mllib/api/python/PythonMLLibAPI.scala |  8 ++--
 .../spark/mllib/clustering/KMeansModel.scala    |  2 +-
 .../spark/mllib/clustering/LDAModel.scala       |  4 +-
 .../clustering/PowerIterationClustering.scala   |  2 +-
 .../BinaryClassificationMetrics.scala           |  2 +-
 .../mllib/evaluation/MulticlassMetrics.scala    |  2 +-
 .../mllib/evaluation/MultilabelMetrics.scala    |  4 +-
 .../mllib/evaluation/RegressionMetrics.scala    |  2 +-
 .../spark/mllib/feature/ChiSqSelector.scala     |  2 +-
 .../org/apache/spark/mllib/fpm/FPGrowth.scala   |  2 +-
 .../MatrixFactorizationModel.scala              | 12 +++---
 .../mllib/tree/model/DecisionTreeModel.scala    |  2 +-
 .../mllib/tree/model/treeEnsembleModels.scala   |  2 +-
 .../LogisticRegressionSuite.scala               |  2 +-
 .../MultilayerPerceptronClassifierSuite.scala   |  5 ++-
 .../ml/classification/OneVsRestSuite.scala      |  6 +--
 .../ml/clustering/BisectingKMeansSuite.scala    |  3 +-
 .../spark/ml/clustering/KMeansSuite.scala       |  3 +-
 .../apache/spark/ml/clustering/LDASuite.scala   |  2 +-
 .../spark/ml/feature/OneHotEncoderSuite.scala   |  4 +-
 .../spark/ml/feature/StringIndexerSuite.scala   |  6 +--
 .../spark/ml/feature/VectorIndexerSuite.scala   |  5 ++-
 .../apache/spark/ml/feature/Word2VecSuite.scala |  8 ++--
 .../spark/ml/recommendation/ALSSuite.scala      |  7 ++--
 .../spark/ml/regression/GBTRegressorSuite.scala |  2 +-
 .../ml/regression/IsotonicRegressionSuite.scala |  6 +--
 .../ml/regression/LinearRegressionSuite.scala   | 17 ++++----
 .../scala/org/apache/spark/sql/DataFrame.scala  | 42 --------------------
 .../org/apache/spark/sql/GroupedData.scala      |  1 +
 .../org/apache/spark/sql/api/r/SQLUtils.scala   |  2 +-
 .../execution/datasources/jdbc/JdbcUtils.scala  |  2 +-
 .../spark/sql/DataFrameAggregateSuite.scala     |  2 +-
 .../parquet/ParquetFilterSuite.scala            |  2 +-
 .../datasources/parquet/ParquetIOSuite.scala    |  2 +-
 .../sql/execution/ui/SQLListenerSuite.scala     |  4 +-
 .../spark/sql/hive/HiveSparkSubmitSuite.scala   |  2 +-
 .../sql/hive/execution/HiveQuerySuite.scala     |  2 +
 .../spark/sql/hive/orc/OrcQuerySuite.scala      |  5 ++-
 .../apache/spark/sql/hive/parquetSuites.scala   |  2 +-
 67 files changed, 140 insertions(+), 159 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/examples/src/main/scala/org/apache/spark/examples/ml/DataFrameExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/DataFrameExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/DataFrameExample.scala
index 0a477ab..7e608a2 100644
--- a/examples/src/main/scala/org/apache/spark/examples/ml/DataFrameExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/ml/DataFrameExample.scala
@@ -79,7 +79,7 @@ object DataFrameExample {
     labelSummary.show()
 
     // Convert features column to an RDD of vectors.
-    val features = df.select("features").map { case Row(v: Vector) => v }
+    val features = df.select("features").rdd.map { case Row(v: Vector) => v }
     val featureSummary = features.aggregate(new MultivariateOnlineSummarizer())(
       (summary, feat) => summary.add(feat),
       (sum1, sum2) => sum1.merge(sum2))

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala
index a37d12a..d2560cc 100644
--- a/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala
@@ -310,8 +310,8 @@ object DecisionTreeExample {
       data: DataFrame,
       labelColName: String): Unit = {
     val fullPredictions = model.transform(data).cache()
-    val predictions = fullPredictions.select("prediction").map(_.getDouble(0))
-    val labels = fullPredictions.select(labelColName).map(_.getDouble(0))
+    val predictions = fullPredictions.select("prediction").rdd.map(_.getDouble(0))
+    val labels = fullPredictions.select(labelColName).rdd.map(_.getDouble(0))
     // Print number of classes for reference
     val numClasses = MetadataUtils.getNumClasses(fullPredictions.schema(labelColName)) match {
       case Some(n) => n
@@ -335,8 +335,8 @@ object DecisionTreeExample {
       data: DataFrame,
       labelColName: String): Unit = {
     val fullPredictions = model.transform(data).cache()
-    val predictions = fullPredictions.select("prediction").map(_.getDouble(0))
-    val labels = fullPredictions.select(labelColName).map(_.getDouble(0))
+    val predictions = fullPredictions.select("prediction").rdd.map(_.getDouble(0))
+    val labels = fullPredictions.select(labelColName).rdd.map(_.getDouble(0))
     val RMSE = new RegressionMetrics(predictions.zip(labels)).rootMeanSquaredError
     println(s"  Root mean squared error (RMSE): $RMSE")
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/examples/src/main/scala/org/apache/spark/examples/ml/OneVsRestExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/OneVsRestExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/OneVsRestExample.scala
index ccee3b2..a0bb5da 100644
--- a/examples/src/main/scala/org/apache/spark/examples/ml/OneVsRestExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/ml/OneVsRestExample.scala
@@ -155,7 +155,7 @@ object OneVsRestExample {
 
     // evaluate the model
     val predictionsAndLabels = predictions.select("prediction", "label")
-      .map(row => (row.getDouble(0), row.getDouble(1)))
+      .rdd.map(row => (row.getDouble(0), row.getDouble(1)))
 
     val metrics = new MulticlassMetrics(predictionsAndLabels)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala
index d283235..038b2fe 100644
--- a/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala
@@ -221,6 +221,7 @@ object LDAExample {
     val model = pipeline.fit(df)
     val documents = model.transform(df)
       .select("features")
+      .rdd
       .map { case Row(features: Vector) => features }
       .zipWithIndex()
       .map(_.swap)

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala
index a2f0fcd..620ff07 100644
--- a/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala
@@ -52,7 +52,7 @@ object RDDRelation {
     val rddFromSql = sqlContext.sql("SELECT key, value FROM records WHERE key < 10")
 
     println("Result of RDD.map:")
-    rddFromSql.map(row => s"Key: ${row(0)}, Value: ${row(1)}").collect().foreach(println)
+    rddFromSql.rdd.map(row => s"Key: ${row(0)}, Value: ${row(1)}").collect().foreach(println)
 
     // Queries can also be written using a LINQ-like Scala DSL.
     df.where($"key" === 1).orderBy($"value".asc).select($"key").collect().foreach(println)

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala
index 4e427f5..b654a2c 100644
--- a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala
@@ -63,7 +63,7 @@ object HiveFromSpark {
     val rddFromSql = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
 
     println("Result of RDD.map:")
-    val rddAsStrings = rddFromSql.map {
+    val rddAsStrings = rddFromSql.rdd.map {
       case Row(key: Int, value: String) => s"Key: $key, Value: $value"
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala b/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala
index d1388b5..4b27ee6 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala
@@ -122,8 +122,10 @@ abstract class Predictor[
    * and put it in an RDD with strong types.
    */
   protected def extractLabeledPoints(dataset: DataFrame): RDD[LabeledPoint] = {
-    dataset.select($(labelCol), $(featuresCol))
-      .map { case Row(label: Double, features: Vector) => LabeledPoint(label, features) }
+    dataset.select($(labelCol), $(featuresCol)).rdd.map {
+      case Row(label: Double, features: Vector) =>
+        LabeledPoint(label, features)
+    }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
index ac01245..0d329d2 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
@@ -263,10 +263,11 @@ class LogisticRegression @Since("1.2.0") (
   protected[spark] def train(dataset: DataFrame, handlePersistence: Boolean):
       LogisticRegressionModel = {
     val w = if ($(weightCol).isEmpty) lit(1.0) else col($(weightCol))
-    val instances: RDD[Instance] = dataset.select(col($(labelCol)), w, col($(featuresCol))).map {
-      case Row(label: Double, weight: Double, features: Vector) =>
-        Instance(label, weight, features)
-    }
+    val instances: RDD[Instance] =
+      dataset.select(col($(labelCol)), w, col($(featuresCol))).rdd.map {
+        case Row(label: Double, weight: Double, features: Vector) =>
+          Instance(label, weight, features)
+      }
 
     if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)
 
@@ -790,6 +791,7 @@ sealed trait LogisticRegressionSummary extends Serializable {
 /**
  * :: Experimental ::
  * Logistic regression training results.
+ *
  * @param predictions dataframe outputted by the model's `transform` method.
  * @param probabilityCol field in "predictions" which gives the calibrated probability of
  *                       each instance as a vector.
@@ -813,6 +815,7 @@ class BinaryLogisticRegressionTrainingSummary private[classification] (
 /**
  * :: Experimental ::
  * Binary Logistic regression results for a given model.
+ *
  * @param predictions dataframe outputted by the model's `transform` method.
  * @param probabilityCol field in "predictions" which gives the calibrated probability of
  *                       each instance.
@@ -837,7 +840,7 @@ class BinaryLogisticRegressionSummary private[classification] (
   // TODO: Allow the user to vary the number of bins using a setBins method in
   // BinaryClassificationMetrics. For now the default is set to 100.
   @transient private val binaryMetrics = new BinaryClassificationMetrics(
-    predictions.select(probabilityCol, labelCol).map {
+    predictions.select(probabilityCol, labelCol).rdd.map {
       case Row(score: Vector, label: Double) => (score(1), label)
     }, 100
   )

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala
index 45d293b..f014a1d 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala
@@ -112,7 +112,7 @@ class BisectingKMeansModel private[ml] (
   @Since("2.0.0")
   def computeCost(dataset: DataFrame): Double = {
     SchemaUtils.checkColumnType(dataset.schema, $(featuresCol), new VectorUDT)
-    val data = dataset.select(col($(featuresCol))).map { case Row(point: Vector) => point }
+    val data = dataset.select(col($(featuresCol))).rdd.map { case Row(point: Vector) => point }
     parentModel.computeCost(data)
   }
 }
@@ -176,7 +176,7 @@ class BisectingKMeans @Since("2.0.0") (
 
   @Since("2.0.0")
   override def fit(dataset: DataFrame): BisectingKMeansModel = {
-    val rdd = dataset.select(col($(featuresCol))).map { case Row(point: Vector) => point }
+    val rdd = dataset.select(col($(featuresCol))).rdd.map { case Row(point: Vector) => point }
 
     val bkm = new MLlibBisectingKMeans()
       .setK($(k))

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala
index c6a3eac..79332b0 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala
@@ -130,7 +130,7 @@ class KMeansModel private[ml] (
   @Since("1.6.0")
   def computeCost(dataset: DataFrame): Double = {
     SchemaUtils.checkColumnType(dataset.schema, $(featuresCol), new VectorUDT)
-    val data = dataset.select(col($(featuresCol))).map { case Row(point: Vector) => point }
+    val data = dataset.select(col($(featuresCol))).rdd.map { case Row(point: Vector) => point }
     parentModel.computeCost(data)
   }
 
@@ -260,7 +260,7 @@ class KMeans @Since("1.5.0") (
 
   @Since("1.5.0")
   override def fit(dataset: DataFrame): KMeansModel = {
-    val rdd = dataset.select(col($(featuresCol))).map { case Row(point: Vector) => point }
+    val rdd = dataset.select(col($(featuresCol))).rdd.map { case Row(point: Vector) => point }
 
     val algo = new MLlibKMeans()
       .setK($(k))
@@ -303,7 +303,7 @@ class KMeansSummary private[clustering] (
    * Size of each cluster.
    */
   @Since("2.0.0")
-  lazy val size: Array[Int] = cluster.map {
+  lazy val size: Array[Int] = cluster.rdd.map {
     case Row(clusterIdx: Int) => (clusterIdx, 1)
   }.reduceByKey(_ + _).collect().sortBy(_._1).map(_._2)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
index 99383e7..6304b20 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
@@ -803,6 +803,7 @@ private[clustering] object LDA extends DefaultParamsReadable[LDA] {
     dataset
       .withColumn("docId", monotonicallyIncreasingId())
       .select("docId", featuresCol)
+      .rdd
       .map { case Row(docId: Long, features: Vector) =>
         (docId, features)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala
index a1d36c4..00f3125 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala
@@ -84,11 +84,10 @@ class BinaryClassificationEvaluator @Since("1.4.0") (@Since("1.4.0") override va
     SchemaUtils.checkColumnType(schema, $(labelCol), DoubleType)
 
     // TODO: When dataset metadata has been implemented, check rawPredictionCol vector length = 2.
-    val scoreAndLabels = dataset.select($(rawPredictionCol), $(labelCol))
-      .map {
-        case Row(rawPrediction: Vector, label: Double) => (rawPrediction(1), label)
-        case Row(rawPrediction: Double, label: Double) => (rawPrediction, label)
-      }
+    val scoreAndLabels = dataset.select($(rawPredictionCol), $(labelCol)).rdd.map {
+      case Row(rawPrediction: Vector, label: Double) => (rawPrediction(1), label)
+      case Row(rawPrediction: Double, label: Double) => (rawPrediction, label)
+    }
     val metrics = new BinaryClassificationMetrics(scoreAndLabels)
     val metric = $(metricName) match {
       case "areaUnderROC" => metrics.areaUnderROC()

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/main/scala/org/apache/spark/ml/evaluation/MulticlassClassificationEvaluator.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/MulticlassClassificationEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/MulticlassClassificationEvaluator.scala
index a921153..55ff443 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/evaluation/MulticlassClassificationEvaluator.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/MulticlassClassificationEvaluator.scala
@@ -74,9 +74,9 @@ class MulticlassClassificationEvaluator @Since("1.5.0") (@Since("1.5.0") overrid
     SchemaUtils.checkColumnType(schema, $(predictionCol), DoubleType)
     SchemaUtils.checkColumnType(schema, $(labelCol), DoubleType)
 
-    val predictionAndLabels = dataset.select($(predictionCol), $(labelCol))
-      .map { case Row(prediction: Double, label: Double) =>
-      (prediction, label)
+    val predictionAndLabels = dataset.select($(predictionCol), $(labelCol)).rdd.map {
+      case Row(prediction: Double, label: Double) =>
+        (prediction, label)
     }
     val metrics = new MulticlassMetrics(predictionAndLabels)
     val metric = $(metricName) match {

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/main/scala/org/apache/spark/ml/evaluation/RegressionEvaluator.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/RegressionEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/RegressionEvaluator.scala
index b6b25ec..adee61e 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/evaluation/RegressionEvaluator.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/RegressionEvaluator.scala
@@ -85,7 +85,8 @@ final class RegressionEvaluator @Since("1.4.0") (@Since("1.4.0") override val ui
 
     val predictionAndLabels = dataset
       .select(col($(predictionCol)).cast(DoubleType), col($(labelCol)).cast(DoubleType))
-      .map { case Row(prediction: Double, label: Double) =>
+      .rdd.
+      map { case Row(prediction: Double, label: Double) =>
         (prediction, label)
       }
     val metrics = new RegressionMetrics(predictionAndLabels)

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/main/scala/org/apache/spark/ml/feature/ChiSqSelector.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/ChiSqSelector.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/ChiSqSelector.scala
index 7b565ef..4abc459 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/ChiSqSelector.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/ChiSqSelector.scala
@@ -79,7 +79,7 @@ final class ChiSqSelector(override val uid: String)
 
   override def fit(dataset: DataFrame): ChiSqSelectorModel = {
     transformSchema(dataset.schema, logging = true)
-    val input = dataset.select($(labelCol), $(featuresCol)).map {
+    val input = dataset.select($(labelCol), $(featuresCol)).rdd.map {
       case Row(label: Double, features: Vector) =>
         LabeledPoint(label, features)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala
index a6dfe58..cf15145 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala
@@ -133,7 +133,7 @@ class CountVectorizer(override val uid: String)
   override def fit(dataset: DataFrame): CountVectorizerModel = {
     transformSchema(dataset.schema, logging = true)
     val vocSize = $(vocabSize)
-    val input = dataset.select($(inputCol)).map(_.getAs[Seq[String]](0))
+    val input = dataset.select($(inputCol)).rdd.map(_.getAs[Seq[String]](0))
     val minDf = if ($(minDF) >= 1.0) {
       $(minDF)
     } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala
index 9e7eee4..cebbe5c 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala
@@ -79,7 +79,7 @@ final class IDF(override val uid: String) extends Estimator[IDFModel] with IDFBa
 
   override def fit(dataset: DataFrame): IDFModel = {
     transformSchema(dataset.schema, logging = true)
-    val input = dataset.select($(inputCol)).map { case Row(v: Vector) => v }
+    val input = dataset.select($(inputCol)).rdd.map { case Row(v: Vector) => v }
     val idf = new feature.IDF($(minDocFreq)).fit(input)
     copyValues(new IDFModel(uid, idf).setParent(this))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala
index ad0458d..18be5c0 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala
@@ -108,7 +108,7 @@ class MinMaxScaler(override val uid: String)
 
   override def fit(dataset: DataFrame): MinMaxScalerModel = {
     transformSchema(dataset.schema, logging = true)
-    val input = dataset.select($(inputCol)).map { case Row(v: Vector) => v }
+    val input = dataset.select($(inputCol)).rdd.map { case Row(v: Vector) => v }
     val summary = Statistics.colStats(input)
     copyValues(new MinMaxScalerModel(uid, summary.min, summary.max).setParent(this))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala
index 3425404..e9df161 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala
@@ -130,7 +130,7 @@ class OneHotEncoder(override val uid: String) extends Transformer
       transformSchema(dataset.schema)(outputColName))
     if (outputAttrGroup.size < 0) {
       // If the number of attributes is unknown, we check the values from the input column.
-      val numAttrs = dataset.select(col(inputColName).cast(DoubleType)).map(_.getDouble(0))
+      val numAttrs = dataset.select(col(inputColName).cast(DoubleType)).rdd.map(_.getDouble(0))
         .aggregate(0.0)(
           (m, x) => {
             assert(x >=0.0 && x == x.toInt,

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala
index 0e07dfa..80b124f 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala
@@ -70,7 +70,7 @@ class PCA (override val uid: String) extends Estimator[PCAModel] with PCAParams
    */
   override def fit(dataset: DataFrame): PCAModel = {
     transformSchema(dataset.schema, logging = true)
-    val input = dataset.select($(inputCol)).map { case Row(v: Vector) => v}
+    val input = dataset.select($(inputCol)).rdd.map { case Row(v: Vector) => v}
     val pca = new feature.PCA(k = $(k))
     val pcaModel = pca.fit(input)
     copyValues(new PCAModel(uid, pcaModel.pc, pcaModel.explainedVariance).setParent(this))

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
index 6a0b6c2..9952d3b 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
@@ -87,7 +87,7 @@ class StandardScaler(override val uid: String) extends Estimator[StandardScalerM
 
   override def fit(dataset: DataFrame): StandardScalerModel = {
     transformSchema(dataset.schema, logging = true)
-    val input = dataset.select($(inputCol)).map { case Row(v: Vector) => v }
+    val input = dataset.select($(inputCol)).rdd.map { case Row(v: Vector) => v }
     val scaler = new feature.StandardScaler(withMean = $(withMean), withStd = $(withStd))
     val scalerModel = scaler.fit(input)
     copyValues(new StandardScalerModel(uid, scalerModel.std, scalerModel.mean).setParent(this))

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
index 912bd95..e8b617d 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
@@ -83,6 +83,7 @@ class StringIndexer(override val uid: String) extends Estimator[StringIndexerMod
 
   override def fit(dataset: DataFrame): StringIndexerModel = {
     val counts = dataset.select(col($(inputCol)).cast(StringType))
+      .rdd
       .map(_.getString(0))
       .countByValue()
     val labels = counts.toSeq.sortBy(-_._2).map(_._1).toArray

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala
index 2a52684..5c11760 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala
@@ -113,7 +113,7 @@ class VectorIndexer(override val uid: String) extends Estimator[VectorIndexerMod
     val firstRow = dataset.select($(inputCol)).take(1)
     require(firstRow.length == 1, s"VectorIndexer cannot be fit on an empty dataset.")
     val numFeatures = firstRow(0).getAs[Vector](0).size
-    val vectorDataset = dataset.select($(inputCol)).map { case Row(v: Vector) => v }
+    val vectorDataset = dataset.select($(inputCol)).rdd.map { case Row(v: Vector) => v }
     val maxCats = $(maxCategories)
     val categoryStats: VectorIndexer.CategoryStats = vectorDataset.mapPartitions { iter =>
       val localCatStats = new VectorIndexer.CategoryStats(numFeatures, maxCats)

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala
index 2b6b3c3..a4c3d27 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala
@@ -138,7 +138,7 @@ final class Word2Vec(override val uid: String) extends Estimator[Word2VecModel]
 
   override def fit(dataset: DataFrame): Word2VecModel = {
     transformSchema(dataset.schema, logging = true)
-    val input = dataset.select($(inputCol)).map(_.getAs[Seq[String]](0))
+    val input = dataset.select($(inputCol)).rdd.map(_.getAs[Seq[String]](0))
     val wordVectors = new feature.Word2Vec()
       .setLearningRate($(stepSize))
       .setMinCount($(minCount))

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
index 4be4d6a..dacdac9 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
@@ -392,6 +392,7 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel]
     val r = if ($(ratingCol) != "") col($(ratingCol)).cast(FloatType) else lit(1.0f)
     val ratings = dataset
       .select(col($(userCol)).cast(IntegerType), col($(itemCol)).cast(IntegerType), r)
+      .rdd
       .map { row =>
         Rating(row.getInt(0), row.getInt(1), row.getFloat(2))
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
index 1e5b4cb..e4339d6 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
@@ -184,7 +184,7 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S
    * and put it in an RDD with strong types.
    */
   protected[ml] def extractAFTPoints(dataset: DataFrame): RDD[AFTPoint] = {
-    dataset.select($(featuresCol), $(labelCol), $(censorCol)).map {
+    dataset.select($(featuresCol), $(labelCol), $(censorCol)).rdd.map {
       case Row(features: Vector, label: Double, censor: Double) =>
         AFTPoint(features, label, censor)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala
index 1573bb4..36b006c 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala
@@ -90,9 +90,9 @@ private[regression] trait IsotonicRegressionBase extends Params with HasFeatures
     } else {
       lit(1.0)
     }
-    dataset.select(col($(labelCol)), f, w)
-      .map { case Row(label: Double, feature: Double, weight: Double) =>
-      (label, feature, weight)
+    dataset.select(col($(labelCol)), f, w).rdd.map {
+      case Row(label: Double, feature: Double, weight: Double) =>
+        (label, feature, weight)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
index ccfb5c4..8f78fd1 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
@@ -158,7 +158,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
 
   override protected def train(dataset: DataFrame): LinearRegressionModel = {
     // Extract the number of features before deciding optimization solver.
-    val numFeatures = dataset.select(col($(featuresCol))).limit(1).map {
+    val numFeatures = dataset.select(col($(featuresCol))).limit(1).rdd.map {
       case Row(features: Vector) => features.size
     }.first()
     val w = if ($(weightCol).isEmpty) lit(1.0) else col($(weightCol))
@@ -170,7 +170,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
       // For low dimensional data, WeightedLeastSquares is more efficiently since the
       // training algorithm only requires one pass through the data. (SPARK-10668)
       val instances: RDD[Instance] = dataset.select(
-        col($(labelCol)), w, col($(featuresCol))).map {
+        col($(labelCol)), w, col($(featuresCol))).rdd.map {
           case Row(label: Double, weight: Double, features: Vector) =>
             Instance(label, weight, features)
       }
@@ -196,10 +196,11 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
       return lrModel.setSummary(trainingSummary)
     }
 
-    val instances: RDD[Instance] = dataset.select(col($(labelCol)), w, col($(featuresCol))).map {
-      case Row(label: Double, weight: Double, features: Vector) =>
-        Instance(label, weight, features)
-    }
+    val instances: RDD[Instance] =
+      dataset.select(col($(labelCol)), w, col($(featuresCol))).rdd.map {
+        case Row(label: Double, weight: Double, features: Vector) =>
+          Instance(label, weight, features)
+      }
 
     val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE
     if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)
@@ -513,6 +514,7 @@ object LinearRegressionModel extends MLReadable[LinearRegressionModel] {
  * :: Experimental ::
  * Linear regression training results. Currently, the training summary ignores the
  * training coefficients except for the objective trace.
+ *
  * @param predictions predictions outputted by the model's `transform` method.
  * @param objectiveHistory objective function (scaled loss + regularization) at each iteration.
  */
@@ -537,6 +539,7 @@ class LinearRegressionTrainingSummary private[regression] (
 /**
  * :: Experimental ::
  * Linear regression results evaluated on a dataset.
+ *
  * @param predictions predictions outputted by the model's `transform` method.
  */
 @Since("1.5.0")
@@ -551,6 +554,7 @@ class LinearRegressionSummary private[regression] (
   @transient private val metrics = new RegressionMetrics(
     predictions
       .select(predictionCol, labelCol)
+      .rdd
       .map { case Row(pred: Double, label: Double) => (pred, label) },
     !model.getFitIntercept)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index 93cf16e..ca0ed95 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -1052,7 +1052,7 @@ private[python] class PythonMLLibAPI extends Serializable {
    * Java stub for the constructor of Python mllib RankingMetrics
    */
   def newRankingMetrics(predictionAndLabels: DataFrame): RankingMetrics[Any] = {
-    new RankingMetrics(predictionAndLabels.map(
+    new RankingMetrics(predictionAndLabels.rdd.map(
       r => (r.getSeq(0).toArray[Any], r.getSeq(1).toArray[Any])))
   }
 
@@ -1135,7 +1135,7 @@ private[python] class PythonMLLibAPI extends Serializable {
   def createIndexedRowMatrix(rows: DataFrame, numRows: Long, numCols: Int): IndexedRowMatrix = {
     // We use DataFrames for serialization of IndexedRows from Python,
     // so map each Row in the DataFrame back to an IndexedRow.
-    val indexedRows = rows.map {
+    val indexedRows = rows.rdd.map {
       case Row(index: Long, vector: Vector) => IndexedRow(index, vector)
     }
     new IndexedRowMatrix(indexedRows, numRows, numCols)
@@ -1147,7 +1147,7 @@ private[python] class PythonMLLibAPI extends Serializable {
   def createCoordinateMatrix(rows: DataFrame, numRows: Long, numCols: Long): CoordinateMatrix = {
     // We use DataFrames for serialization of MatrixEntry entries from
     // Python, so map each Row in the DataFrame back to a MatrixEntry.
-    val entries = rows.map {
+    val entries = rows.rdd.map {
       case Row(i: Long, j: Long, value: Double) => MatrixEntry(i, j, value)
     }
     new CoordinateMatrix(entries, numRows, numCols)
@@ -1161,7 +1161,7 @@ private[python] class PythonMLLibAPI extends Serializable {
     // We use DataFrames for serialization of sub-matrix blocks from
     // Python, so map each Row in the DataFrame back to a
     // ((blockRowIndex, blockColIndex), sub-matrix) tuple.
-    val blockTuples = blocks.map {
+    val blockTuples = blocks.rdd.map {
       case Row(Row(blockRowIndex: Long, blockColIndex: Long), subMatrix: Matrix) =>
         ((blockRowIndex.toInt, blockColIndex.toInt), subMatrix)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
index 26c6235..3b91fe8 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
@@ -143,7 +143,7 @@ object KMeansModel extends Loader[KMeansModel] {
       val k = (metadata \ "k").extract[Int]
       val centroids = sqlContext.read.parquet(Loader.dataPath(path))
       Loader.checkSchema[Cluster](centroids.schema)
-      val localCentroids = centroids.map(Cluster.apply).collect()
+      val localCentroids = centroids.rdd.map(Cluster.apply).collect()
       assert(k == localCentroids.size)
       new KMeansModel(localCentroids.sortBy(_.id).map(_.point))
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
index b30ecb8..25d67a3 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
@@ -896,11 +896,11 @@ object DistributedLDAModel extends Loader[DistributedLDAModel] {
       Loader.checkSchema[EdgeData](edgeDataFrame.schema)
       val globalTopicTotals: LDA.TopicCounts =
         dataFrame.first().getAs[Vector](0).toBreeze.toDenseVector
-      val vertices: RDD[(VertexId, LDA.TopicCounts)] = vertexDataFrame.map {
+      val vertices: RDD[(VertexId, LDA.TopicCounts)] = vertexDataFrame.rdd.map {
         case Row(ind: Long, vec: Vector) => (ind, vec.toBreeze.toDenseVector)
       }
 
-      val edges: RDD[Edge[LDA.TokenCount]] = edgeDataFrame.map {
+      val edges: RDD[Edge[LDA.TokenCount]] = edgeDataFrame.rdd.map {
         case Row(srcId: Long, dstId: Long, prop: Double) => Edge(srcId, dstId, prop)
       }
       val graph: Graph[LDA.TopicCounts, LDA.TokenCount] = Graph(vertices, edges)

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
index feacafe..9732dfa 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
@@ -93,7 +93,7 @@ object PowerIterationClusteringModel extends Loader[PowerIterationClusteringMode
       val assignments = sqlContext.read.parquet(Loader.dataPath(path))
       Loader.checkSchema[PowerIterationClustering.Assignment](assignments.schema)
 
-      val assignmentsRDD = assignments.map {
+      val assignmentsRDD = assignments.rdd.map {
         case Row(id: Long, cluster: Int) => PowerIterationClustering.Assignment(id, cluster)
       }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala
index 12cf220..319c547 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala
@@ -58,7 +58,7 @@ class BinaryClassificationMetrics @Since("1.3.0") (
    * @param scoreAndLabels a DataFrame with two double columns: score and label
    */
   private[mllib] def this(scoreAndLabels: DataFrame) =
-    this(scoreAndLabels.map(r => (r.getDouble(0), r.getDouble(1))))
+    this(scoreAndLabels.rdd.map(r => (r.getDouble(0), r.getDouble(1))))
 
   /**
    * Unpersist intermediate RDDs used in the computation.

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MulticlassMetrics.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MulticlassMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MulticlassMetrics.scala
index c510496..3029b15 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MulticlassMetrics.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MulticlassMetrics.scala
@@ -38,7 +38,7 @@ class MulticlassMetrics @Since("1.1.0") (predictionAndLabels: RDD[(Double, Doubl
    * @param predictionAndLabels a DataFrame with two double columns: prediction and label
    */
   private[mllib] def this(predictionAndLabels: DataFrame) =
-    this(predictionAndLabels.map(r => (r.getDouble(0), r.getDouble(1))))
+    this(predictionAndLabels.rdd.map(r => (r.getDouble(0), r.getDouble(1))))
 
   private lazy val labelCountByClass: Map[Double, Long] = predictionAndLabels.values.countByValue()
   private lazy val labelCount: Long = labelCountByClass.values.sum

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MultilabelMetrics.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MultilabelMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MultilabelMetrics.scala
index c100b3c..daf6ff4 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MultilabelMetrics.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MultilabelMetrics.scala
@@ -35,7 +35,9 @@ class MultilabelMetrics @Since("1.2.0") (predictionAndLabels: RDD[(Array[Double]
    * @param predictionAndLabels a DataFrame with two double array columns: prediction and label
    */
   private[mllib] def this(predictionAndLabels: DataFrame) =
-    this(predictionAndLabels.map(r => (r.getSeq[Double](0).toArray, r.getSeq[Double](1).toArray)))
+    this(predictionAndLabels.rdd.map { r =>
+      (r.getSeq[Double](0).toArray, r.getSeq[Double](1).toArray)
+    })
 
   private lazy val numDocs: Long = predictionAndLabels.count()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala
index 18c90b2..0f4c97e 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala
@@ -46,7 +46,7 @@ class RegressionMetrics @Since("2.0.0") (
    *                                  prediction and observation
    */
   private[mllib] def this(predictionAndObservations: DataFrame) =
-    this(predictionAndObservations.map(r => (r.getDouble(0), r.getDouble(1))))
+    this(predictionAndObservations.rdd.map(r => (r.getDouble(0), r.getDouble(1))))
 
   /**
    * Use MultivariateOnlineSummarizer to calculate summary statistics of observations and errors.

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala
index 33728bf..4f0e13f 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala
@@ -161,7 +161,7 @@ object ChiSqSelectorModel extends Loader[ChiSqSelectorModel] {
       // Check schema explicitly since erasure makes it hard to use match-case for checking.
       Loader.checkSchema[Data](dataFrame.schema)
 
-      val features = dataArray.map {
+      val features = dataArray.rdd.map {
         case Row(feature: Int) => (feature)
       }.collect()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala
index 85d6093..b35d721 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala
@@ -134,7 +134,7 @@ object FPGrowthModel extends Loader[FPGrowthModel[_]] {
     }
 
     def loadImpl[Item: ClassTag](freqItemsets: DataFrame, sample: Item): FPGrowthModel[Item] = {
-      val freqItemsetsRDD = freqItemsets.select("items", "freq").map { x =>
+      val freqItemsetsRDD = freqItemsets.select("items", "freq").rdd.map { x =>
         val items = x.getAs[Seq[Item]](0).toArray
         val freq = x.getLong(1)
         new FreqItemset(items, freq)

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
index 628cf1d..c91729a 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
@@ -369,13 +369,13 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] {
       assert(className == thisClassName)
       assert(formatVersion == thisFormatVersion)
       val rank = (metadata \ "rank").extract[Int]
-      val userFeatures = sqlContext.read.parquet(userPath(path))
-        .map { case Row(id: Int, features: Seq[_]) =>
+      val userFeatures = sqlContext.read.parquet(userPath(path)).rdd.map {
+        case Row(id: Int, features: Seq[_]) =>
+          (id, features.asInstanceOf[Seq[Double]].toArray)
+      }
+      val productFeatures = sqlContext.read.parquet(productPath(path)).rdd.map {
+        case Row(id: Int, features: Seq[_]) =>
           (id, features.asInstanceOf[Seq[Double]].toArray)
-        }
-      val productFeatures = sqlContext.read.parquet(productPath(path))
-        .map { case Row(id: Int, features: Seq[_]) =>
-        (id, features.asInstanceOf[Seq[Double]].toArray)
       }
       new MatrixFactorizationModel(rank, userFeatures, productFeatures)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
index 89c470d..ec5d7b9 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
@@ -247,7 +247,7 @@ object DecisionTreeModel extends Loader[DecisionTreeModel] with Logging {
       val dataRDD = sqlContext.read.parquet(datapath)
       // Check schema explicitly since erasure makes it hard to use match-case for checking.
       Loader.checkSchema[NodeData](dataRDD.schema)
-      val nodes = dataRDD.map(NodeData.apply)
+      val nodes = dataRDD.rdd.map(NodeData.apply)
       // Build node data into a tree.
       val trees = constructTrees(nodes)
       assert(trees.size == 1,

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala
index feabcee..59713c3 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala
@@ -473,7 +473,7 @@ private[tree] object TreeEnsembleModel extends Logging {
         treeAlgo: String): Array[DecisionTreeModel] = {
       val datapath = Loader.dataPath(path)
       val sqlContext = SQLContext.getOrCreate(sc)
-      val nodes = sqlContext.read.parquet(datapath).map(NodeData.apply)
+      val nodes = sqlContext.read.parquet(datapath).rdd.map(NodeData.apply)
       val trees = constructTrees(nodes)
       trees.map(new DecisionTreeModel(_, Algo.fromString(treeAlgo)))
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
index 972c086..cfb9bbf 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
@@ -735,7 +735,7 @@ class LogisticRegressionSuite
     val model1 = trainer1.fit(binaryDataset)
     val model2 = trainer2.fit(binaryDataset)
 
-    val histogram = binaryDataset.map { case Row(label: Double, features: Vector) => label }
+    val histogram = binaryDataset.rdd.map { case Row(label: Double, features: Vector) => label }
       .treeAggregate(new MultiClassSummarizer)(
         seqOp = (c, v) => (c, v) match {
           case (classSummarizer: MultiClassSummarizer, label: Double) => classSummarizer.add(label)

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala
index a326432..602b5a8 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala
@@ -76,8 +76,9 @@ class MultilayerPerceptronClassifierSuite extends SparkFunSuite with MLlibTestSp
     val model = trainer.fit(dataFrame)
     val numFeatures = dataFrame.select("features").first().getAs[Vector](0).size
     assert(model.numFeatures === numFeatures)
-    val mlpPredictionAndLabels = model.transform(dataFrame).select("prediction", "label")
-      .map { case Row(p: Double, l: Double) => (p, l) }
+    val mlpPredictionAndLabels = model.transform(dataFrame).select("prediction", "label").rdd.map {
+      case Row(p: Double, l: Double) => (p, l)
+    }
     // train multinomial logistic regression
     val lr = new LogisticRegressionWithLBFGS()
       .setIntercept(true)

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala
index 445e50d..2ae74a2 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala
@@ -81,9 +81,9 @@ class OneVsRestSuite extends SparkFunSuite with MLlibTestSparkContext {
     val predictionColSchema = transformedDataset.schema(ovaModel.getPredictionCol)
     assert(MetadataUtils.getNumClasses(predictionColSchema) === Some(3))
 
-    val ovaResults = transformedDataset
-      .select("prediction", "label")
-      .map(row => (row.getDouble(0), row.getDouble(1)))
+    val ovaResults = transformedDataset.select("prediction", "label").rdd.map {
+      row => (row.getDouble(0), row.getDouble(1))
+    }
 
     val lr = new LogisticRegressionWithLBFGS().setIntercept(true).setNumClasses(numClasses)
     lr.optimizer.setRegParam(0.1).setNumIterations(100)

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala
index fc4a4ad..b719a8c 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala
@@ -77,7 +77,8 @@ class BisectingKMeansSuite extends SparkFunSuite with MLlibTestSparkContext {
     expectedColumns.foreach { column =>
       assert(transformed.columns.contains(column))
     }
-    val clusters = transformed.select(predictionColName).map(_.getInt(0)).distinct().collect().toSet
+    val clusters =
+      transformed.select(predictionColName).rdd.map(_.getInt(0)).distinct().collect().toSet
     assert(clusters.size === k)
     assert(clusters === Set(0, 1, 2, 3, 4))
     assert(model.computeCost(dataset) < 0.1)

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala
index e5357ba..c684bc1 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala
@@ -93,7 +93,8 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultR
     expectedColumns.foreach { column =>
       assert(transformed.columns.contains(column))
     }
-    val clusters = transformed.select(predictionColName).map(_.getInt(0)).distinct().collect().toSet
+    val clusters =
+      transformed.select(predictionColName).rdd.map(_.getInt(0)).distinct().collect().toSet
     assert(clusters.size === k)
     assert(clusters === Set(0, 1, 2, 3, 4))
     assert(model.computeCost(dataset) < 0.1)

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala
index 97dbfd9..0327040 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala
@@ -199,7 +199,7 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead
     // describeTopics
     val topics = model.describeTopics(3)
     assert(topics.count() === k)
-    assert(topics.select("topic").map(_.getInt(0)).collect().toSet === Range(0, k).toSet)
+    assert(topics.select("topic").rdd.map(_.getInt(0)).collect().toSet === Range(0, k).toSet)
     topics.select("termIndices").collect().foreach { case r: Row =>
       val termIndices = r.getAs[Seq[Int]](0)
       assert(termIndices.length === 3 && termIndices.toSet.size === 3)

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderSuite.scala
index 76d1205..e238b33 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderSuite.scala
@@ -51,7 +51,7 @@ class OneHotEncoderSuite
       .setDropLast(false)
     val encoded = encoder.transform(transformed)
 
-    val output = encoded.select("id", "labelVec").map { r =>
+    val output = encoded.select("id", "labelVec").rdd.map { r =>
       val vec = r.getAs[Vector](1)
       (r.getInt(0), vec(0), vec(1), vec(2))
     }.collect().toSet
@@ -68,7 +68,7 @@ class OneHotEncoderSuite
       .setOutputCol("labelVec")
     val encoded = encoder.transform(transformed)
 
-    val output = encoded.select("id", "labelVec").map { r =>
+    val output = encoded.select("id", "labelVec").rdd.map { r =>
       val vec = r.getAs[Vector](1)
       (r.getInt(0), vec(0), vec(1))
     }.collect().toSet

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
index 5d199ca..dea6164 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
@@ -52,7 +52,7 @@ class StringIndexerSuite
     val attr = Attribute.fromStructField(transformed.schema("labelIndex"))
       .asInstanceOf[NominalAttribute]
     assert(attr.values.get === Array("a", "c", "b"))
-    val output = transformed.select("id", "labelIndex").map { r =>
+    val output = transformed.select("id", "labelIndex").rdd.map { r =>
       (r.getInt(0), r.getDouble(1))
     }.collect().toSet
     // a -> 0, b -> 2, c -> 1
@@ -83,7 +83,7 @@ class StringIndexerSuite
     val attr = Attribute.fromStructField(transformed.schema("labelIndex"))
       .asInstanceOf[NominalAttribute]
     assert(attr.values.get === Array("b", "a"))
-    val output = transformed.select("id", "labelIndex").map { r =>
+    val output = transformed.select("id", "labelIndex").rdd.map { r =>
       (r.getInt(0), r.getDouble(1))
     }.collect().toSet
     // a -> 1, b -> 0
@@ -102,7 +102,7 @@ class StringIndexerSuite
     val attr = Attribute.fromStructField(transformed.schema("labelIndex"))
       .asInstanceOf[NominalAttribute]
     assert(attr.values.get === Array("100", "300", "200"))
-    val output = transformed.select("id", "labelIndex").map { r =>
+    val output = transformed.select("id", "labelIndex").rdd.map { r =>
       (r.getInt(0), r.getDouble(1))
     }.collect().toSet
     // 100 -> 0, 200 -> 2, 300 -> 1

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala
index 67817fa..d4f836e 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala
@@ -159,7 +159,7 @@ class VectorIndexerSuite extends SparkFunSuite with MLlibTestSparkContext
         // Chose correct categorical features
         assert(categoryMaps.keys.toSet === categoricalFeatures)
         val transformed = model.transform(data).select("indexed")
-        val indexedRDD: RDD[Vector] = transformed.map(_.getAs[Vector](0))
+        val indexedRDD: RDD[Vector] = transformed.rdd.map(_.getAs[Vector](0))
         val featureAttrs = AttributeGroup.fromStructField(transformed.schema("indexed"))
         assert(featureAttrs.name === "indexed")
         assert(featureAttrs.attributes.get.length === model.numFeatures)
@@ -216,7 +216,8 @@ class VectorIndexerSuite extends SparkFunSuite with MLlibTestSparkContext
       val points = data.collect().map(_.getAs[Vector](0))
       val vectorIndexer = getIndexer.setMaxCategories(maxCategories)
       val model = vectorIndexer.fit(data)
-      val indexedPoints = model.transform(data).select("indexed").map(_.getAs[Vector](0)).collect()
+      val indexedPoints =
+        model.transform(data).select("indexed").rdd.map(_.getAs[Vector](0)).collect()
       points.zip(indexedPoints).foreach {
         case (orig: SparseVector, indexed: SparseVector) =>
           assert(orig.indices.length == indexed.indices.length)

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/test/scala/org/apache/spark/ml/feature/Word2VecSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/Word2VecSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/Word2VecSuite.scala
index f094c55..1671fb6 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/Word2VecSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/Word2VecSuite.scala
@@ -100,7 +100,7 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul
       .setSeed(42L)
       .fit(docDF)
 
-    val realVectors = model.getVectors.sort("word").select("vector").map {
+    val realVectors = model.getVectors.sort("word").select("vector").rdd.map {
       case Row(v: Vector) => v
     }.collect()
     // These expectations are just magic values, characterizing the current
@@ -134,7 +134,7 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul
       .fit(docDF)
 
     val expectedSimilarity = Array(0.2608488929093532, -0.8271274846926078)
-    val (synonyms, similarity) = model.findSynonyms("a", 2).map {
+    val (synonyms, similarity) = model.findSynonyms("a", 2).rdd.map {
       case Row(w: String, sim: Double) => (w, sim)
     }.collect().unzip
 
@@ -161,7 +161,7 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul
       .setSeed(42L)
       .fit(docDF)
 
-    val (synonyms, similarity) = model.findSynonyms("a", 6).map {
+    val (synonyms, similarity) = model.findSynonyms("a", 6).rdd.map {
       case Row(w: String, sim: Double) => (w, sim)
     }.collect().unzip
 
@@ -174,7 +174,7 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul
       .setWindowSize(10)
       .fit(docDF)
 
-    val (synonymsLarger, similarityLarger) = model.findSynonyms("a", 6).map {
+    val (synonymsLarger, similarityLarger) = model.findSynonyms("a", 6).rdd.map {
       case Row(w: String, sim: Double) => (w, sim)
     }.collect().unzip
     // The similarity score should be very different with the larger window

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
index ff0d8f5..2bedd70 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
@@ -342,11 +342,10 @@ class ALSSuite
       .setSeed(0)
     val alpha = als.getAlpha
     val model = als.fit(training.toDF())
-    val predictions = model.transform(test.toDF())
-      .select("rating", "prediction")
-      .map { case Row(rating: Float, prediction: Float) =>
+    val predictions = model.transform(test.toDF()).select("rating", "prediction").rdd.map {
+      case Row(rating: Float, prediction: Float) =>
         (rating.toDouble, prediction.toDouble)
-      }
+    }
     val rmse =
       if (implicitPrefs) {
         // TODO: Use a better (rank-based?) evaluation metric for implicit feedback.

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/test/scala/org/apache/spark/ml/regression/GBTRegressorSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/GBTRegressorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/GBTRegressorSuite.scala
index 0932660..244db86 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/regression/GBTRegressorSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/regression/GBTRegressorSuite.scala
@@ -87,7 +87,7 @@ class GBTRegressorSuite extends SparkFunSuite with MLlibTestSparkContext {
     // copied model must have the same parent.
     MLTestingUtils.checkCopy(model)
     val preds = model.transform(df)
-    val predictions = preds.select("prediction").map(_.getDouble(0))
+    val predictions = preds.select("prediction").rdd.map(_.getDouble(0))
     // Checks based on SPARK-8736 (to ensure it is not doing classification)
     assert(predictions.max() > 2)
     assert(predictions.min() < -1)

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala
index f067c29..b8874b4 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala
@@ -46,7 +46,7 @@ class IsotonicRegressionSuite
 
     val predictions = model
       .transform(dataset)
-      .select("prediction").map { case Row(pred) =>
+      .select("prediction").rdd.map { case Row(pred) =>
         pred
       }.collect()
 
@@ -66,7 +66,7 @@ class IsotonicRegressionSuite
 
     val predictions = model
       .transform(features)
-      .select("prediction").map {
+      .select("prediction").rdd.map {
         case Row(pred) => pred
       }.collect()
 
@@ -160,7 +160,7 @@ class IsotonicRegressionSuite
 
     val predictions = model
       .transform(features)
-      .select("prediction").map {
+      .select("prediction").rdd.map {
       case Row(pred) => pred
     }.collect()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala
index 3ae108d..9dee04c 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala
@@ -686,17 +686,18 @@ class LinearRegressionSuite
 
       // Residuals in [[LinearRegressionResults]] should equal those manually computed
       val expectedResiduals = datasetWithDenseFeature.select("features", "label")
+        .rdd
         .map { case Row(features: DenseVector, label: Double) =>
-        val prediction =
-          features(0) * model.coefficients(0) + features(1) * model.coefficients(1) +
-            model.intercept
-        label - prediction
-      }
-        .zip(model.summary.residuals.map(_.getDouble(0)))
+          val prediction =
+            features(0) * model.coefficients(0) + features(1) * model.coefficients(1) +
+              model.intercept
+          label - prediction
+        }
+        .zip(model.summary.residuals.rdd.map(_.getDouble(0)))
         .collect()
         .foreach { case (manualResidual: Double, resultResidual: Double) =>
-        assert(manualResidual ~== resultResidual relTol 1E-5)
-      }
+          assert(manualResidual ~== resultResidual relTol 1E-5)
+        }
 
       /*
          # Use the following R code to generate model training results.

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index abb8fe5..be902d6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -1427,48 +1427,6 @@ class DataFrame private[sql](
   def transform[U](t: DataFrame => DataFrame): DataFrame = t(this)
 
   /**
-   * Returns a new RDD by applying a function to all rows of this DataFrame.
-   * @group rdd
-   * @since 1.3.0
-   */
-  def map[R: ClassTag](f: Row => R): RDD[R] = rdd.map(f)
-
-  /**
-   * Returns a new RDD by first applying a function to all rows of this [[DataFrame]],
-   * and then flattening the results.
-   * @group rdd
-   * @since 1.3.0
-   */
-  def flatMap[R: ClassTag](f: Row => TraversableOnce[R]): RDD[R] = rdd.flatMap(f)
-
-  /**
-   * Returns a new RDD by applying a function to each partition of this DataFrame.
-   * @group rdd
-   * @since 1.3.0
-   */
-  def mapPartitions[R: ClassTag](f: Iterator[Row] => Iterator[R]): RDD[R] = {
-    rdd.mapPartitions(f)
-  }
-
-  /**
-   * Applies a function `f` to all rows.
-   * @group rdd
-   * @since 1.3.0
-   */
-  def foreach(f: Row => Unit): Unit = withNewExecutionId {
-    rdd.foreach(f)
-  }
-
-  /**
-   * Applies a function f to each partition of this [[DataFrame]].
-   * @group rdd
-   * @since 1.3.0
-   */
-  def foreachPartition(f: Iterator[Row] => Unit): Unit = withNewExecutionId {
-    rdd.foreachPartition(f)
-  }
-
-  /**
    * Returns the first `n` rows in the [[DataFrame]].
    *
    * Running take requires moving data into the application's driver process, and doing so with

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
index f06d161..a7258d7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
@@ -306,6 +306,7 @@ class GroupedData protected[sql](
     val values = df.select(pivotColumn)
       .distinct()
       .sort(pivotColumn)  // ensure that the output columns are in a consistent logical order
+      .rdd
       .map(_.get(0))
       .take(maxValues + 1)
       .toSeq

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
index d912aeb..68a2517 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
@@ -100,7 +100,7 @@ private[r] object SQLUtils {
   }
 
   def dfToRowRDD(df: DataFrame): JavaRDD[Array[Byte]] = {
-    df.map(r => rowToRBytes(r))
+    df.rdd.map(r => rowToRBytes(r))
   }
 
   private[this] def doConversion(data: Object, dataType: DataType): Object = {

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index e295722..d7111a6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -276,7 +276,7 @@ object JdbcUtils extends Logging {
     val rddSchema = df.schema
     val getConnection: () => Connection = createConnectionFactory(url, properties)
     val batchSize = properties.getProperty("batchsize", "1000").toInt
-    df.foreachPartition { iterator =>
+    df.rdd.foreachPartition { iterator =>
       savePartition(getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index f54bff9..7d96ef6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -257,7 +257,7 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext {
   }
 
   test("count") {
-    assert(testData2.count() === testData2.map(_ => 1).count())
+    assert(testData2.count() === testData2.rdd.map(_ => 1).count())
 
     checkAnswer(
       testData2.agg(count('a), sumDistinct('a)), // non-partial

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index fbffe86..bd51154 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -101,7 +101,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
       (implicit df: DataFrame): Unit = {
     def checkBinaryAnswer(df: DataFrame, expected: Seq[Row]) = {
       assertResult(expected.map(_.getAs[Array[Byte]](0).mkString(",")).sorted) {
-        df.map(_.getAs[Array[Byte]](0).mkString(",")).collect().toSeq.sorted
+        df.rdd.map(_.getAs[Array[Byte]](0).mkString(",")).collect().toSeq.sorted
       }
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 3c74464..c85eedd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -599,7 +599,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
   test("null and non-null strings") {
     // Create a dataset where the first values are NULL and then some non-null values. The
     // number of non-nulls needs to be bigger than the ParquetReader batch size.
-    val data = sqlContext.range(200).map { i =>
+    val data = sqlContext.range(200).rdd.map { i =>
       if (i.getLong(0) < 150) Row(None)
       else Row("a")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index 085e4a4..39920d8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -330,7 +330,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
     // listener should ignore the non SQL stage
     assert(sqlContext.listener.stageIdToStageMetrics.size == previousStageNumber)
 
-    sqlContext.sparkContext.parallelize(1 to 10).toDF().foreach(i => ())
+    sqlContext.sparkContext.parallelize(1 to 10).toDF().rdd.foreach(i => ())
     sqlContext.sparkContext.listenerBus.waitUntilEmpty(10000)
     // listener should save the SQL stage
     assert(sqlContext.listener.stageIdToStageMetrics.size == previousStageNumber + 1)
@@ -398,7 +398,7 @@ class SQLListenerMemoryLeakSuite extends SparkFunSuite {
           ).toDF()
           df.collect()
           try {
-            df.foreach(_ => throw new RuntimeException("Oops"))
+            df.rdd.foreach(_ => throw new RuntimeException("Oops"))
           } catch {
             case e: SparkException => // This is expected for a failed job
           }

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index f141a9b..12a5542 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -210,7 +210,7 @@ object SparkSubmitClassLoaderTest extends Logging {
     }
     // Second, we load classes at the executor side.
     logInfo("Testing load classes at the executor side.")
-    val result = df.mapPartitions { x =>
+    val result = df.rdd.mapPartitions { x =>
       var exception: String = null
       try {
         Utils.classForName(args(0))

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 3208ebc..1002487 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -664,11 +664,13 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
 
   test("implement identity function using case statement") {
     val actual = sql("SELECT (CASE key WHEN key THEN key END) FROM src")
+      .rdd
       .map { case Row(i: Int) => i }
       .collect()
       .toSet
 
     val expected = sql("SELECT key FROM src")
+      .rdd
       .map { case Row(i: Int) => i }
       .collect()
       .toSet

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index b11d1d9..6824951 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -119,6 +119,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
       // expr = (not leaf-0)
       assertResult(10) {
         sql("SELECT name, contacts FROM t where age > 5")
+          .rdd
           .flatMap(_.getAs[Seq[_]]("contacts"))
           .count()
       }
@@ -131,7 +132,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
         val df = sql("SELECT name, contacts FROM t WHERE age > 5 AND age < 8")
         assert(df.count() === 2)
         assertResult(4) {
-          df.flatMap(_.getAs[Seq[_]]("contacts")).count()
+          df.rdd.flatMap(_.getAs[Seq[_]]("contacts")).count()
         }
       }
 
@@ -143,7 +144,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
         val df = sql("SELECT name, contacts FROM t WHERE age < 2 OR age > 8")
         assert(df.count() === 3)
         assertResult(6) {
-          df.flatMap(_.getAs[Seq[_]]("contacts")).count()
+          df.rdd.flatMap(_.getAs[Seq[_]]("contacts")).count()
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/157fe64f/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 68d5c7d..a127cf6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -854,7 +854,7 @@ abstract class ParquetPartitioningTest extends QueryTest with SQLTestUtils with
     test(s"hive udfs $table") {
       checkAnswer(
         sql(s"SELECT concat(stringField, stringField) FROM $table"),
-        sql(s"SELECT stringField FROM $table").map {
+        sql(s"SELECT stringField FROM $table").rdd.map {
           case Row(s: String) => Row(s + s)
         }.collect().toSeq)
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org