You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/03/16 19:20:19 UTC

spark git commit: [SPARK-13894][SQL] SqlContext.range return type from DataFrame to DataSet

Repository: spark
Updated Branches:
  refs/heads/master d9e8f26d0 -> d9670f847


[SPARK-13894][SQL] SqlContext.range return type from DataFrame to DataSet

## What changes were proposed in this pull request?
https://issues.apache.org/jira/browse/SPARK-13894
Change the return type of the `SQLContext.range` API from `DataFrame` to `Dataset`.

## How was this patch tested?
No additional unit test required.

Author: Cheng Hao <ha...@intel.com>

Closes #11730 from chenghao-intel/range.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d9670f84
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d9670f84
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d9670f84

Branch: refs/heads/master
Commit: d9670f84739b0840501b19b8cb0e851850edb8c1
Parents: d9e8f26
Author: Cheng Hao <ha...@intel.com>
Authored: Wed Mar 16 11:20:15 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Mar 16 11:20:15 2016 -0700

----------------------------------------------------------------------
 .../spark/ml/feature/StringIndexerSuite.scala   |  2 +-
 .../scala/org/apache/spark/sql/SQLContext.scala | 33 ++++++++++----------
 .../apache/spark/sql/JavaDataFrameSuite.java    |  4 +--
 .../org/apache/spark/sql/DataFrameSuite.scala   |  4 +--
 .../org/apache/spark/sql/SQLQuerySuite.scala    |  2 +-
 .../sql/execution/WholeStageCodegenSuite.scala  |  2 +-
 .../datasources/parquet/ParquetIOSuite.scala    | 10 +++---
 .../sql/execution/metric/SQLMetricsSuite.scala  |  8 ++---
 .../spark/sql/hive/MultiDatabaseSuite.scala     |  2 +-
 .../sql/hive/execution/SQLQuerySuite.scala      |  4 +--
 .../sources/ParquetHadoopFsRelationSuite.scala  |  2 +-
 .../sql/sources/hadoopFsRelationSuites.scala    |  2 +-
 12 files changed, 38 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d9670f84/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
index b9533f8..d40e69d 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
@@ -114,7 +114,7 @@ class StringIndexerSuite
     val indexerModel = new StringIndexerModel("indexer", Array("a", "b", "c"))
       .setInputCol("label")
       .setOutputCol("labelIndex")
-    val df = sqlContext.range(0L, 10L)
+    val df = sqlContext.range(0L, 10L).toDF()
     assert(indexerModel.transform(df).eq(df))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d9670f84/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 177d78c..e4d9308 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -60,6 +60,7 @@ import org.apache.spark.util.Utils
  * @groupname specificdata Specific Data Sources
  * @groupname config Configuration
  * @groupname dataframes Custom DataFrame Creation
+ * @groupname dataset Custom DataFrame Creation
  * @groupname Ungrouped Support functions for language integrated queries
  * @since 1.0.0
  */
@@ -716,53 +717,53 @@ class SQLContext private[sql](
 
   /**
    * :: Experimental ::
-   * Creates a [[DataFrame]] with a single [[LongType]] column named `id`, containing elements
+   * Creates a [[Dataset]] with a single [[LongType]] column named `id`, containing elements
    * in an range from 0 to `end` (exclusive) with step value 1.
    *
-   * @since 1.4.1
-   * @group dataframe
+   * @since 2.0.0
+   * @group dataset
    */
   @Experimental
-  def range(end: Long): DataFrame = range(0, end)
+  def range(end: Long): Dataset[Long] = range(0, end)
 
   /**
    * :: Experimental ::
-   * Creates a [[DataFrame]] with a single [[LongType]] column named `id`, containing elements
+   * Creates a [[Dataset]] with a single [[LongType]] column named `id`, containing elements
    * in an range from `start` to `end` (exclusive) with step value 1.
    *
-   * @since 1.4.0
-   * @group dataframe
+   * @since 2.0.0
+   * @group dataset
    */
   @Experimental
-  def range(start: Long, end: Long): DataFrame = {
+  def range(start: Long, end: Long): Dataset[Long] = {
     range(start, end, step = 1, numPartitions = sparkContext.defaultParallelism)
   }
 
   /**
     * :: Experimental ::
-    * Creates a [[DataFrame]] with a single [[LongType]] column named `id`, containing elements
+    * Creates a [[Dataset]] with a single [[LongType]] column named `id`, containing elements
     * in an range from `start` to `end` (exclusive) with an step value.
     *
     * @since 2.0.0
-    * @group dataframe
+    * @group dataset
     */
   @Experimental
-  def range(start: Long, end: Long, step: Long): DataFrame = {
+  def range(start: Long, end: Long, step: Long): Dataset[Long] = {
     range(start, end, step, numPartitions = sparkContext.defaultParallelism)
   }
 
   /**
    * :: Experimental ::
-   * Creates a [[DataFrame]] with a single [[LongType]] column named `id`, containing elements
+   * Creates a [[Dataset]] with a single [[LongType]] column named `id`, containing elements
    * in an range from `start` to `end` (exclusive) with an step value, with partition number
    * specified.
    *
-   * @since 1.4.0
-   * @group dataframe
+   * @since 2.0.0
+   * @group dataset
    */
   @Experimental
-  def range(start: Long, end: Long, step: Long, numPartitions: Int): DataFrame = {
-    Dataset.newDataFrame(this, Range(start, end, step, numPartitions))
+  def range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[Long] = {
+    new Dataset(this, Range(start, end, step, numPartitions), implicits.newLongEncoder)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/d9670f84/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
index 7fe17e0..f3c5a86 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
@@ -328,7 +328,7 @@ public class JavaDataFrameSuite {
 
   @Test
   public void testCountMinSketch() {
-    Dataset<Row> df = context.range(1000);
+    Dataset df = context.range(1000);
 
     CountMinSketch sketch1 = df.stat().countMinSketch("id", 10, 20, 42);
     Assert.assertEquals(sketch1.totalCount(), 1000);
@@ -353,7 +353,7 @@ public class JavaDataFrameSuite {
 
   @Test
   public void testBloomFilter() {
-    Dataset<Row> df = context.range(1000);
+    Dataset df = context.range(1000);
 
     BloomFilter filter1 = df.stat().bloomFilter("id", 1000, 0.03);
     Assert.assertTrue(filter1.expectedFpp() - 0.03 < 1e-3);

http://git-wip-us.apache.org/repos/asf/spark/blob/d9670f84/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 2333fa2..199e138 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -344,7 +344,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
 
     // SPARK-12340: overstep the bounds of Int in SparkPlan.executeTake
     checkAnswer(
-      sqlContext.range(2).limit(2147483638),
+      sqlContext.range(2).toDF().limit(2147483638),
       Row(0) :: Row(1) :: Nil
     )
   }
@@ -1312,7 +1312,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
 
   test("reuse exchange") {
     withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "2") {
-      val df = sqlContext.range(100)
+      val df = sqlContext.range(100).toDF()
       val join = df.join(df, "id")
       val plan = join.queryExecution.executedPlan
       checkAnswer(join, df)

http://git-wip-us.apache.org/repos/asf/spark/blob/d9670f84/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 836fb1c..3efe984 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1718,7 +1718,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
   }
 
   test("run sql directly on files") {
-    val df = sqlContext.range(100)
+    val df = sqlContext.range(100).toDF()
     withTempPath(f => {
       df.write.json(f.getCanonicalPath)
       checkAnswer(sql(s"select id from json.`${f.getCanonicalPath}`"),

http://git-wip-us.apache.org/repos/asf/spark/blob/d9670f84/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
index e00c762..716c367 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
@@ -64,7 +64,7 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext {
   }
 
   test("Sort should be included in WholeStageCodegen") {
-    val df = sqlContext.range(3, 0, -1).sort(col("id"))
+    val df = sqlContext.range(3, 0, -1).toDF().sort(col("id"))
     val plan = df.queryExecution.executedPlan
     assert(plan.find(p =>
       p.isInstanceOf[WholeStageCodegen] &&

http://git-wip-us.apache.org/repos/asf/spark/blob/d9670f84/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index b7834d7..4d9a8d7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -599,11 +599,11 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
   test("null and non-null strings") {
     // Create a dataset where the first values are NULL and then some non-null values. The
     // number of non-nulls needs to be bigger than the ParquetReader batch size.
-    val data = sqlContext.range(200).rdd.map { i =>
-      if (i.getLong(0) < 150) Row(None)
-      else Row("a")
-    }
-    val df = sqlContext.createDataFrame(data, StructType(StructField("col", StringType) :: Nil))
+    val data: Dataset[String] = sqlContext.range(200).map (i =>
+      if (i < 150) null
+      else "a"
+    )
+    val df = data.toDF("col")
     assert(df.agg("col" -> "count").collect().head.getLong(0) == 50)
 
     withTempPath { dir =>

http://git-wip-us.apache.org/repos/asf/spark/blob/d9670f84/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index d7bd215..988852a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -128,8 +128,8 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
     // Assume the execution plan is
     // WholeStageCodegen(nodeId = 0, Range(nodeId = 2) -> Filter(nodeId = 1))
     // TODO: update metrics in generated operators
-    val df = sqlContext.range(10).filter('id < 5)
-    testSparkPlanMetrics(df, 1, Map.empty)
+    val ds = sqlContext.range(10).filter('id < 5)
+    testSparkPlanMetrics(ds.toDF(), 1, Map.empty)
   }
 
   test("TungstenAggregate metrics") {
@@ -157,8 +157,8 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
   test("Sort metrics") {
     // Assume the execution plan is
     // WholeStageCodegen(nodeId = 0, Range(nodeId = 2) -> Sort(nodeId = 1))
-    val df = sqlContext.range(10).sort('id)
-    testSparkPlanMetrics(df, 2, Map.empty)
+    val ds = sqlContext.range(10).sort('id)
+    testSparkPlanMetrics(ds.toDF(), 2, Map.empty)
   }
 
   test("SortMergeJoin metrics") {

http://git-wip-us.apache.org/repos/asf/spark/blob/d9670f84/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
index e2effef..d275190 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.test.SQLTestUtils
 
 class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
-  private lazy val df = sqlContext.range(10).coalesce(1)
+  private lazy val df = sqlContext.range(10).coalesce(1).toDF()
 
   private def checkTablePath(dbName: String, tableName: String): Unit = {
     val metastoreTable = hiveContext.sessionState.catalog.client.getTable(dbName, tableName)

http://git-wip-us.apache.org/repos/asf/spark/blob/d9670f84/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index d6c10d6..9667b53 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -1425,7 +1425,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
   }
 
   test("run sql directly on files") {
-    val df = sqlContext.range(100)
+    val df = sqlContext.range(100).toDF()
     withTempPath(f => {
       df.write.parquet(f.getCanonicalPath)
       checkAnswer(sql(s"select id from parquet.`${f.getCanonicalPath}`"),
@@ -1582,7 +1582,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
         withView("v") {
           sql("CREATE VIEW v AS SELECT * FROM add_col")
           sqlContext.range(10).select('id, 'id as 'a).write.mode("overwrite").saveAsTable("add_col")
-          checkAnswer(sql("SELECT * FROM v"), sqlContext.range(10))
+          checkAnswer(sql("SELECT * FROM v"), sqlContext.range(10).toDF())
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/d9670f84/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
index 8856148..1e5dbd9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
@@ -126,7 +126,7 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
   test("SPARK-8604: Parquet data source should write summary file while doing appending") {
     withTempPath { dir =>
       val path = dir.getCanonicalPath
-      val df = sqlContext.range(0, 5)
+      val df = sqlContext.range(0, 5).toDF()
       df.write.mode(SaveMode.Overwrite).parquet(path)
 
       val summaryPath = new Path(path, "_metadata")

http://git-wip-us.apache.org/repos/asf/spark/blob/d9670f84/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 7e09616..7e5506e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -673,7 +673,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
           classOf[AlwaysFailOutputCommitter].getName)
 
         // Code below shouldn't throw since customized output committer should be disabled.
-        val df = sqlContext.range(10).coalesce(1)
+        val df = sqlContext.range(10).toDF().coalesce(1)
         df.write.format(dataSourceName).save(dir.getCanonicalPath)
         checkAnswer(
           sqlContext


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org