You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/02/27 22:33:06 UTC

spark git commit: [SPARK-15615][SQL][BUILD][FOLLOW-UP] Replace deprecated usage of json(RDD[String]) API

Repository: spark
Updated Branches:
  refs/heads/master 4ba9c6c45 -> 8a5a58506


[SPARK-15615][SQL][BUILD][FOLLOW-UP] Replace deprecated usage of json(RDD[String]) API

## What changes were proposed in this pull request?

This PR proposes to replace the deprecated `json(RDD[String])` usage to `json(Dataset[String])`.

This currently produces so many warnings.

## How was this patch tested?

Fixed tests.

Author: hyukjinkwon <gu...@gmail.com>

Closes #17071 from HyukjinKwon/SPARK-15615-followup.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8a5a5850
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8a5a5850
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8a5a5850

Branch: refs/heads/master
Commit: 8a5a58506c35f35f41cd1366ee693abec2916153
Parents: 4ba9c6c
Author: hyukjinkwon <gu...@gmail.com>
Authored: Mon Feb 27 14:33:02 2017 -0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Mon Feb 27 14:33:02 2017 -0800

----------------------------------------------------------------------
 .../examples/sql/JavaSQLDataSourceExample.java  |   9 +-
 .../examples/sql/SQLDataSourceExample.scala     |  10 +-
 .../apache/spark/sql/JavaApplySchemaSuite.java  |   9 +-
 .../apache/spark/sql/JavaDataFrameSuite.java    |   9 ++
 .../org/apache/spark/sql/JavaSaveLoadSuite.java |   8 +-
 .../org/apache/spark/sql/DataFrameSuite.scala   |  12 +-
 .../org/apache/spark/sql/SQLQuerySuite.scala    |  41 +++---
 .../apache/spark/sql/UserDefinedTypeSuite.scala |   6 +-
 .../benchmark/WideSchemaBenchmark.scala         |   8 +-
 .../json/JsonParsingOptionsSuite.scala          |  37 ++----
 .../execution/datasources/json/JsonSuite.scala  |  38 +++---
 .../datasources/json/TestJsonData.scala         | 126 ++++++++++---------
 .../sql/sources/CreateTableAsSelectSuite.scala  |   5 +-
 .../apache/spark/sql/sources/InsertSuite.scala  |  10 +-
 .../spark/sql/sources/SaveLoadSuite.scala       |   6 +-
 .../spark/sql/hive/JavaDataFrameSuite.java      |   6 +-
 .../sql/hive/JavaMetastoreDataSourcesSuite.java |   6 +-
 .../sql/hive/MetastoreDataSourcesSuite.scala    |   4 +-
 .../sql/hive/execution/HiveExplainSuite.scala   |   5 +-
 .../hive/execution/HiveResolutionSuite.scala    |   8 +-
 .../sql/hive/execution/SQLQuerySuite.scala      |  27 ++--
 21 files changed, 189 insertions(+), 201 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8a5a5850/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
index adb96dd..82bb284 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
@@ -25,8 +25,6 @@ import java.util.List;
 import java.util.Properties;
 
 // $example on:basic_parquet_example$
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.MapFunction;
 import org.apache.spark.sql.Encoders;
 // $example on:schema_merging$
@@ -217,12 +215,11 @@ public class JavaSQLDataSourceExample {
     // +------+
 
     // Alternatively, a DataFrame can be created for a JSON dataset represented by
-    // an RDD[String] storing one JSON object per string.
+    // an Dataset[String] storing one JSON object per string.
     List<String> jsonData = Arrays.asList(
             "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
-    JavaRDD<String> anotherPeopleRDD =
-            new JavaSparkContext(spark.sparkContext()).parallelize(jsonData);
-    Dataset<Row> anotherPeople = spark.read().json(anotherPeopleRDD);
+    Dataset<String> anotherPeopleDataset = spark.createDataset(jsonData, Encoders.STRING());
+    Dataset<Row> anotherPeople = spark.read().json(anotherPeopleDataset);
     anotherPeople.show();
     // +---------------+----+
     // |        address|name|

http://git-wip-us.apache.org/repos/asf/spark/blob/8a5a5850/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala
index 66f7cb1..381e69c 100644
--- a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala
@@ -111,6 +111,10 @@ object SQLDataSourceExample {
 
   private def runJsonDatasetExample(spark: SparkSession): Unit = {
     // $example on:json_dataset$
+    // Primitive types (Int, String, etc) and Product types (case classes) encoders are
+    // supported by importing this when creating a Dataset.
+    import spark.implicits._
+
     // A JSON dataset is pointed to by path.
     // The path can be either a single text file or a directory storing text files
     val path = "examples/src/main/resources/people.json"
@@ -135,10 +139,10 @@ object SQLDataSourceExample {
     // +------+
 
     // Alternatively, a DataFrame can be created for a JSON dataset represented by
-    // an RDD[String] storing one JSON object per string
-    val otherPeopleRDD = spark.sparkContext.makeRDD(
+    // an Dataset[String] storing one JSON object per string
+    val otherPeopleDataset = spark.createDataset(
       """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
-    val otherPeople = spark.read.json(otherPeopleRDD)
+    val otherPeople = spark.read.json(otherPeopleDataset)
     otherPeople.show()
     // +---------------+----+
     // |        address|name|

http://git-wip-us.apache.org/repos/asf/spark/blob/8a5a5850/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java
index bf8ff61..eb4d76c 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java
@@ -31,6 +31,7 @@ import org.junit.Test;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.RowFactory;
 import org.apache.spark.sql.SparkSession;
@@ -146,13 +147,13 @@ public class JavaApplySchemaSuite implements Serializable {
 
   @Test
   public void applySchemaToJSON() {
-    JavaRDD<String> jsonRDD = jsc.parallelize(Arrays.asList(
+    Dataset<String> jsonDS = spark.createDataset(Arrays.asList(
       "{\"string\":\"this is a simple string.\", \"integer\":10, \"long\":21474836470, " +
         "\"bigInteger\":92233720368547758070, \"double\":1.7976931348623157E308, " +
         "\"boolean\":true, \"null\":null}",
       "{\"string\":\"this is another simple string.\", \"integer\":11, \"long\":21474836469, " +
         "\"bigInteger\":92233720368547758069, \"double\":1.7976931348623157E305, " +
-        "\"boolean\":false, \"null\":null}"));
+        "\"boolean\":false, \"null\":null}"), Encoders.STRING());
     List<StructField> fields = new ArrayList<>(7);
     fields.add(DataTypes.createStructField("bigInteger", DataTypes.createDecimalType(20, 0),
       true));
@@ -183,14 +184,14 @@ public class JavaApplySchemaSuite implements Serializable {
         null,
         "this is another simple string."));
 
-    Dataset<Row> df1 = spark.read().json(jsonRDD);
+    Dataset<Row> df1 = spark.read().json(jsonDS);
     StructType actualSchema1 = df1.schema();
     Assert.assertEquals(expectedSchema, actualSchema1);
     df1.createOrReplaceTempView("jsonTable1");
     List<Row> actual1 = spark.sql("select * from jsonTable1").collectAsList();
     Assert.assertEquals(expectedResult, actual1);
 
-    Dataset<Row> df2 = spark.read().schema(expectedSchema).json(jsonRDD);
+    Dataset<Row> df2 = spark.read().schema(expectedSchema).json(jsonDS);
     StructType actualSchema2 = df2.schema();
     Assert.assertEquals(expectedSchema, actualSchema2);
     df2.createOrReplaceTempView("jsonTable2");

http://git-wip-us.apache.org/repos/asf/spark/blob/8a5a5850/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
index a8f814b..be8d95d 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
@@ -414,4 +414,13 @@ public class JavaDataFrameSuite {
     Assert.assertEquals(df.schema().length(), 0);
     Assert.assertEquals(df.collectAsList().size(), 1);
   }
+
+  @Test
+  public void testJsonRDDToDataFrame() {
+    // This is a test for the deprecated API in SPARK-15615.
+    JavaRDD<String> rdd = jsc.parallelize(Arrays.asList("{\"a\": 2}"));
+    Dataset<Row> df = spark.read().json(rdd);
+    Assert.assertEquals(1L, df.count());
+    Assert.assertEquals(2L, df.collectAsList().get(0).getLong(0));
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8a5a5850/sql/core/src/test/java/test/org/apache/spark/sql/JavaSaveLoadSuite.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaSaveLoadSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaSaveLoadSuite.java
index 6941c86..127d272 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaSaveLoadSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaSaveLoadSuite.java
@@ -29,8 +29,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.*;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.StructField;
@@ -40,7 +38,6 @@ import org.apache.spark.util.Utils;
 public class JavaSaveLoadSuite {
 
   private transient SparkSession spark;
-  private transient JavaSparkContext jsc;
 
   File path;
   Dataset<Row> df;
@@ -58,7 +55,6 @@ public class JavaSaveLoadSuite {
       .master("local[*]")
       .appName("testing")
       .getOrCreate();
-    jsc = new JavaSparkContext(spark.sparkContext());
 
     path =
       Utils.createTempDir(System.getProperty("java.io.tmpdir"), "datasource").getCanonicalFile();
@@ -70,8 +66,8 @@ public class JavaSaveLoadSuite {
     for (int i = 0; i < 10; i++) {
       jsonObjects.add("{\"a\":" + i + ", \"b\":\"str" + i + "\"}");
     }
-    JavaRDD<String> rdd = jsc.parallelize(jsonObjects);
-    df = spark.read().json(rdd);
+    Dataset<String> ds = spark.createDataset(jsonObjects, Encoders.STRING());
+    df = spark.read().json(ds);
     df.createOrReplaceTempView("jsonTable");
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8a5a5850/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 5e65436..19c2d55 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -914,15 +914,13 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
   }
 
   test("SPARK-7551: support backticks for DataFrame attribute resolution") {
-    val df = spark.read.json(sparkContext.makeRDD(
-      """{"a.b": {"c": {"d..e": {"f": 1}}}}""" :: Nil))
+    val df = spark.read.json(Seq("""{"a.b": {"c": {"d..e": {"f": 1}}}}""").toDS())
     checkAnswer(
       df.select(df("`a.b`.c.`d..e`.`f`")),
       Row(1)
     )
 
-    val df2 = spark.read.json(sparkContext.makeRDD(
-      """{"a  b": {"c": {"d  e": {"f": 1}}}}""" :: Nil))
+    val df2 = spark.read.json(Seq("""{"a  b": {"c": {"d  e": {"f": 1}}}}""").toDS())
     checkAnswer(
       df2.select(df2("`a  b`.c.d  e.f")),
       Row(1)
@@ -1110,8 +1108,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
   }
 
   test("SPARK-9323: DataFrame.orderBy should support nested column name") {
-    val df = spark.read.json(sparkContext.makeRDD(
-      """{"a": {"b": 1}}""" :: Nil))
+    val df = spark.read.json(Seq("""{"a": {"b": 1}}""").toDS())
     checkAnswer(df.orderBy("a.b"), Row(Row(1)))
   }
 
@@ -1164,8 +1161,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
   }
 
   test("SPARK-10316: respect non-deterministic expressions in PhysicalOperation") {
-    val input = spark.read.json(spark.sparkContext.makeRDD(
-      (1 to 10).map(i => s"""{"id": $i}""")))
+    val input = spark.read.json((1 to 10).map(i => s"""{"id": $i}""").toDS())
 
     val df = input.select($"id", rand(0).as('r))
     df.as("a").join(df.filter($"r" < 0.5).as("b"), $"a.id" === $"b.id").collect().foreach { row =>

http://git-wip-us.apache.org/repos/asf/spark/blob/8a5a5850/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 03cdfcc..468ea05 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -211,8 +211,8 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
   }
 
   test("grouping on nested fields") {
-    spark.read.json(sparkContext.parallelize(
-      """{"nested": {"attribute": 1}, "value": 2}""" :: Nil))
+    spark.read
+      .json(Seq("""{"nested": {"attribute": 1}, "value": 2}""").toDS())
      .createOrReplaceTempView("rows")
 
     checkAnswer(
@@ -229,9 +229,8 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
   }
 
   test("SPARK-6201 IN type conversion") {
-    spark.read.json(
-      sparkContext.parallelize(
-        Seq("{\"a\": \"1\"}}", "{\"a\": \"2\"}}", "{\"a\": \"3\"}}")))
+    spark.read
+      .json(Seq("{\"a\": \"1\"}}", "{\"a\": \"2\"}}", "{\"a\": \"3\"}}").toDS())
       .createOrReplaceTempView("d")
 
     checkAnswer(
@@ -240,9 +239,8 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
   }
 
   test("SPARK-11226 Skip empty line in json file") {
-    spark.read.json(
-      sparkContext.parallelize(
-        Seq("{\"a\": \"1\"}}", "{\"a\": \"2\"}}", "{\"a\": \"3\"}}", "")))
+    spark.read
+      .json(Seq("{\"a\": \"1\"}}", "{\"a\": \"2\"}}", "{\"a\": \"3\"}}", "").toDS())
       .createOrReplaceTempView("d")
 
     checkAnswer(
@@ -1214,8 +1212,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
   }
 
   test("SPARK-3483 Special chars in column names") {
-    val data = sparkContext.parallelize(
-      Seq("""{"key?number1": "value1", "key.number2": "value2"}"""))
+    val data = Seq("""{"key?number1": "value1", "key.number2": "value2"}""").toDS()
     spark.read.json(data).createOrReplaceTempView("records")
     sql("SELECT `key?number1`, `key.number2` FROM records")
   }
@@ -1257,13 +1254,13 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
   }
 
   test("SPARK-4322 Grouping field with struct field as sub expression") {
-    spark.read.json(sparkContext.makeRDD("""{"a": {"b": [{"c": 1}]}}""" :: Nil))
+    spark.read.json(Seq("""{"a": {"b": [{"c": 1}]}}""").toDS())
       .createOrReplaceTempView("data")
     checkAnswer(sql("SELECT a.b[0].c FROM data GROUP BY a.b[0].c"), Row(1))
     spark.catalog.dropTempView("data")
 
-    spark.read.json(
-      sparkContext.makeRDD("""{"a": {"b": 1}}""" :: Nil)).createOrReplaceTempView("data")
+    spark.read.json(Seq("""{"a": {"b": 1}}""").toDS())
+      .createOrReplaceTempView("data")
     checkAnswer(sql("SELECT a.b + 1 FROM data GROUP BY a.b + 1"), Row(2))
     spark.catalog.dropTempView("data")
   }
@@ -1311,8 +1308,8 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
   }
 
   test("SPARK-6145: ORDER BY test for nested fields") {
-    spark.read.json(sparkContext.makeRDD(
-        """{"a": {"b": 1, "a": {"a": 1}}, "c": [{"d": 1}]}""" :: Nil))
+    spark.read
+      .json(Seq("""{"a": {"b": 1, "a": {"a": 1}}, "c": [{"d": 1}]}""").toDS())
       .createOrReplaceTempView("nestedOrder")
 
     checkAnswer(sql("SELECT 1 FROM nestedOrder ORDER BY a.b"), Row(1))
@@ -1325,7 +1322,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
 
   test("SPARK-6145: special cases") {
     spark.read
-      .json(sparkContext.makeRDD("""{"a": {"b": [1]}, "b": [{"a": 1}], "_c0": {"a": 1}}""" :: Nil))
+      .json(Seq("""{"a": {"b": [1]}, "b": [{"a": 1}], "_c0": {"a": 1}}""").toDS())
       .createOrReplaceTempView("t")
 
     checkAnswer(sql("SELECT a.b[0] FROM t ORDER BY _c0.a"), Row(1))
@@ -1333,8 +1330,8 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
   }
 
   test("SPARK-6898: complete support for special chars in column names") {
-    spark.read.json(sparkContext.makeRDD(
-      """{"a": {"c.b": 1}, "b.$q": [{"a@!.q": 1}], "q.w": {"w.i&": [1]}}""" :: Nil))
+    spark.read
+      .json(Seq("""{"a": {"c.b": 1}, "b.$q": [{"a@!.q": 1}], "q.w": {"w.i&": [1]}}""").toDS())
       .createOrReplaceTempView("t")
 
     checkAnswer(sql("SELECT a.`c.b`, `b.$q`[0].`a@!.q`, `q.w`.`w.i&`[0] FROM t"), Row(1, 1, 1))
@@ -1437,8 +1434,9 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
 
   test("SPARK-7067: order by queries for complex ExtractValue chain") {
     withTempView("t") {
-      spark.read.json(sparkContext.makeRDD(
-        """{"a": {"b": [{"c": 1}]}, "b": [{"d": 1}]}""" :: Nil)).createOrReplaceTempView("t")
+      spark.read
+        .json(Seq("""{"a": {"b": [{"c": 1}]}, "b": [{"d": 1}]}""").toDS())
+        .createOrReplaceTempView("t")
       checkAnswer(sql("SELECT a.b FROM t ORDER BY b[0].d"), Row(Seq(Row(1))))
     }
   }
@@ -2109,8 +2107,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
           |"a": [{"count": 3}], "b": [{"e": "test", "count": 1}]}}}'
           |
         """.stripMargin
-      val rdd = sparkContext.parallelize(Array(json))
-      spark.read.json(rdd).write.mode("overwrite").parquet(dir.toString)
+      spark.read.json(Seq(json).toDS()).write.mode("overwrite").parquet(dir.toString)
       spark.read.parquet(dir.toString).collect()
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/8a5a5850/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
index c7a77da..b096a6d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
@@ -221,8 +221,7 @@ class UserDefinedTypeSuite extends QueryTest with SharedSQLContext with ParquetT
       StructField("vec", new UDT.MyDenseVectorUDT, false)
     ))
 
-    val stringRDD = sparkContext.parallelize(data)
-    val jsonRDD = spark.read.schema(schema).json(stringRDD)
+    val jsonRDD = spark.read.schema(schema).json(data.toDS())
     checkAnswer(
       jsonRDD,
       Row(1, new UDT.MyDenseVector(Array(1.1, 2.2, 3.3, 4.4))) ::
@@ -242,8 +241,7 @@ class UserDefinedTypeSuite extends QueryTest with SharedSQLContext with ParquetT
       StructField("vec", new UDT.MyDenseVectorUDT, false)
     ))
 
-    val stringRDD = sparkContext.parallelize(data)
-    val jsonDataset = spark.read.schema(schema).json(stringRDD)
+    val jsonDataset = spark.read.schema(schema).json(data.toDS())
       .as[(Int, UDT.MyDenseVector)]
     checkDataset(
       jsonDataset,

http://git-wip-us.apache.org/repos/asf/spark/blob/8a5a5850/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala
index d2704b3..a42891e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala
@@ -140,7 +140,7 @@ class WideSchemaBenchmark extends SparkFunSuite with BeforeAndAfterEach {
       }
       datum += "}"
       datum = s"""{"a": {"b": {"c": $datum, "d": $datum}, "e": $datum}}"""
-      val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum).rdd).cache()
+      val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum)).cache()
       df.count()  // force caching
       addCases(benchmark, df, s"$width wide x $numRows rows", "a.b.c.value_1")
     }
@@ -157,7 +157,7 @@ class WideSchemaBenchmark extends SparkFunSuite with BeforeAndAfterEach {
         datum = "{\"value\": " + datum + "}"
         selector = selector + ".value"
       }
-      val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum).rdd).cache()
+      val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum)).cache()
       df.count()  // force caching
       addCases(benchmark, df, s"$depth deep x $numRows rows", selector)
     }
@@ -180,7 +180,7 @@ class WideSchemaBenchmark extends SparkFunSuite with BeforeAndAfterEach {
       }
       // TODO(ekl) seems like the json parsing is actually the majority of the time, perhaps
       // we should benchmark that too separately.
-      val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum).rdd).cache()
+      val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum)).cache()
       df.count()  // force caching
       addCases(benchmark, df, s"$numNodes x $depth deep x $numRows rows", selector)
     }
@@ -200,7 +200,7 @@ class WideSchemaBenchmark extends SparkFunSuite with BeforeAndAfterEach {
         }
       }
       datum += "]}"
-      val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum).rdd).cache()
+      val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum)).cache()
       df.count()  // force caching
       addCases(benchmark, df, s"$width wide x $numRows rows", "value[0]")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/8a5a5850/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala
index 0b72da5..6e2b4f0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala
@@ -25,19 +25,18 @@ import org.apache.spark.sql.test.SharedSQLContext
  * Test cases for various [[JSONOptions]].
  */
 class JsonParsingOptionsSuite extends QueryTest with SharedSQLContext {
+  import testImplicits._
 
   test("allowComments off") {
     val str = """{'name': /* hello */ 'Reynold Xin'}"""
-    val rdd = spark.sparkContext.parallelize(Seq(str))
-    val df = spark.read.json(rdd)
+    val df = spark.read.json(Seq(str).toDS())
 
     assert(df.schema.head.name == "_corrupt_record")
   }
 
   test("allowComments on") {
     val str = """{'name': /* hello */ 'Reynold Xin'}"""
-    val rdd = spark.sparkContext.parallelize(Seq(str))
-    val df = spark.read.option("allowComments", "true").json(rdd)
+    val df = spark.read.option("allowComments", "true").json(Seq(str).toDS())
 
     assert(df.schema.head.name == "name")
     assert(df.first().getString(0) == "Reynold Xin")
@@ -45,16 +44,14 @@ class JsonParsingOptionsSuite extends QueryTest with SharedSQLContext {
 
   test("allowSingleQuotes off") {
     val str = """{'name': 'Reynold Xin'}"""
-    val rdd = spark.sparkContext.parallelize(Seq(str))
-    val df = spark.read.option("allowSingleQuotes", "false").json(rdd)
+    val df = spark.read.option("allowSingleQuotes", "false").json(Seq(str).toDS())
 
     assert(df.schema.head.name == "_corrupt_record")
   }
 
   test("allowSingleQuotes on") {
     val str = """{'name': 'Reynold Xin'}"""
-    val rdd = spark.sparkContext.parallelize(Seq(str))
-    val df = spark.read.json(rdd)
+    val df = spark.read.json(Seq(str).toDS())
 
     assert(df.schema.head.name == "name")
     assert(df.first().getString(0) == "Reynold Xin")
@@ -62,16 +59,14 @@ class JsonParsingOptionsSuite extends QueryTest with SharedSQLContext {
 
   test("allowUnquotedFieldNames off") {
     val str = """{name: 'Reynold Xin'}"""
-    val rdd = spark.sparkContext.parallelize(Seq(str))
-    val df = spark.read.json(rdd)
+    val df = spark.read.json(Seq(str).toDS())
 
     assert(df.schema.head.name == "_corrupt_record")
   }
 
   test("allowUnquotedFieldNames on") {
     val str = """{name: 'Reynold Xin'}"""
-    val rdd = spark.sparkContext.parallelize(Seq(str))
-    val df = spark.read.option("allowUnquotedFieldNames", "true").json(rdd)
+    val df = spark.read.option("allowUnquotedFieldNames", "true").json(Seq(str).toDS())
 
     assert(df.schema.head.name == "name")
     assert(df.first().getString(0) == "Reynold Xin")
@@ -79,16 +74,14 @@ class JsonParsingOptionsSuite extends QueryTest with SharedSQLContext {
 
   test("allowNumericLeadingZeros off") {
     val str = """{"age": 0018}"""
-    val rdd = spark.sparkContext.parallelize(Seq(str))
-    val df = spark.read.json(rdd)
+    val df = spark.read.json(Seq(str).toDS())
 
     assert(df.schema.head.name == "_corrupt_record")
   }
 
   test("allowNumericLeadingZeros on") {
     val str = """{"age": 0018}"""
-    val rdd = spark.sparkContext.parallelize(Seq(str))
-    val df = spark.read.option("allowNumericLeadingZeros", "true").json(rdd)
+    val df = spark.read.option("allowNumericLeadingZeros", "true").json(Seq(str).toDS())
 
     assert(df.schema.head.name == "age")
     assert(df.first().getLong(0) == 18)
@@ -98,16 +91,14 @@ class JsonParsingOptionsSuite extends QueryTest with SharedSQLContext {
   // JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS.
   ignore("allowNonNumericNumbers off") {
     val str = """{"age": NaN}"""
-    val rdd = spark.sparkContext.parallelize(Seq(str))
-    val df = spark.read.json(rdd)
+    val df = spark.read.json(Seq(str).toDS())
 
     assert(df.schema.head.name == "_corrupt_record")
   }
 
   ignore("allowNonNumericNumbers on") {
     val str = """{"age": NaN}"""
-    val rdd = spark.sparkContext.parallelize(Seq(str))
-    val df = spark.read.option("allowNonNumericNumbers", "true").json(rdd)
+    val df = spark.read.option("allowNonNumericNumbers", "true").json(Seq(str).toDS())
 
     assert(df.schema.head.name == "age")
     assert(df.first().getDouble(0).isNaN)
@@ -115,16 +106,14 @@ class JsonParsingOptionsSuite extends QueryTest with SharedSQLContext {
 
   test("allowBackslashEscapingAnyCharacter off") {
     val str = """{"name": "Cazen Lee", "price": "\$10"}"""
-    val rdd = spark.sparkContext.parallelize(Seq(str))
-    val df = spark.read.option("allowBackslashEscapingAnyCharacter", "false").json(rdd)
+    val df = spark.read.option("allowBackslashEscapingAnyCharacter", "false").json(Seq(str).toDS())
 
     assert(df.schema.head.name == "_corrupt_record")
   }
 
   test("allowBackslashEscapingAnyCharacter on") {
     val str = """{"name": "Cazen Lee", "price": "\$10"}"""
-    val rdd = spark.sparkContext.parallelize(Seq(str))
-    val df = spark.read.option("allowBackslashEscapingAnyCharacter", "true").json(rdd)
+    val df = spark.read.option("allowBackslashEscapingAnyCharacter", "true").json(Seq(str).toDS())
 
     assert(df.schema.head.name == "name")
     assert(df.schema.last.name == "price")

http://git-wip-us.apache.org/repos/asf/spark/blob/8a5a5850/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 0e01be2..0aaf148 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -590,7 +590,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
     val dir = Utils.createTempDir()
     dir.delete()
     val path = dir.getCanonicalPath
-    primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)
+    primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).write.text(path)
     val jsonDF = spark.read.json(path)
 
     val expectedSchema = StructType(
@@ -622,7 +622,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
     val dir = Utils.createTempDir()
     dir.delete()
     val path = dir.getCanonicalPath
-    primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)
+    primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).write.text(path)
     val jsonDF = spark.read.option("primitivesAsString", "true").json(path)
 
     val expectedSchema = StructType(
@@ -777,9 +777,9 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
   }
 
   test("Find compatible types even if inferred DecimalType is not capable of other IntegralType") {
-    val mixedIntegerAndDoubleRecords = sparkContext.parallelize(
-      """{"a": 3, "b": 1.1}""" ::
-      s"""{"a": 3.1, "b": 0.${"0" * 38}1}""" :: Nil)
+    val mixedIntegerAndDoubleRecords = Seq(
+      """{"a": 3, "b": 1.1}""",
+      s"""{"a": 3.1, "b": 0.${"0" * 38}1}""").toDS()
     val jsonDF = spark.read
       .option("prefersDecimal", "true")
       .json(mixedIntegerAndDoubleRecords)
@@ -828,7 +828,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
 
     val mergedJsonDF = spark.read
       .option("prefersDecimal", "true")
-      .json(floatingValueRecords ++ bigIntegerRecords)
+      .json(floatingValueRecords.union(bigIntegerRecords))
 
     val expectedMergedSchema = StructType(
       StructField("a", DoubleType, true) ::
@@ -846,7 +846,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
     val dir = Utils.createTempDir()
     dir.delete()
     val path = dir.toURI.toString
-    primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)
+    primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).write.text(path)
 
     sql(
       s"""
@@ -873,7 +873,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
     val dir = Utils.createTempDir()
     dir.delete()
     val path = dir.getCanonicalPath
-    primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)
+    primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).write.text(path)
 
     val schema = StructType(
       StructField("bigInteger", DecimalType.SYSTEM_DEFAULT, true) ::
@@ -1263,7 +1263,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
     assert(result2(3) === "{\"f1\":{\"f11\":4,\"f12\":true},\"f2\":{\"D4\":2147483644}}")
 
     val jsonDF = spark.read.json(primitiveFieldAndType)
-    val primTable = spark.read.json(jsonDF.toJSON.rdd)
+    val primTable = spark.read.json(jsonDF.toJSON)
     primTable.createOrReplaceTempView("primitiveTable")
     checkAnswer(
         sql("select * from primitiveTable"),
@@ -1276,7 +1276,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
       )
 
     val complexJsonDF = spark.read.json(complexFieldAndType1)
-    val compTable = spark.read.json(complexJsonDF.toJSON.rdd)
+    val compTable = spark.read.json(complexJsonDF.toJSON)
     compTable.createOrReplaceTempView("complexTable")
     // Access elements of a primitive array.
     checkAnswer(
@@ -1364,10 +1364,10 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
     })
   }
 
-  test("SPARK-6245 JsonRDD.inferSchema on empty RDD") {
+  test("SPARK-6245 JsonInferSchema.infer on empty RDD") {
     // This is really a test that it doesn't throw an exception
     val emptySchema = JsonInferSchema.infer(
-      empty,
+      empty.rdd,
       new JSONOptions(Map.empty[String, String], "GMT"),
       CreateJacksonParser.string)
     assert(StructType(Seq()) === emptySchema)
@@ -1394,7 +1394,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
 
   test("SPARK-8093 Erase empty structs") {
     val emptySchema = JsonInferSchema.infer(
-      emptyRecords,
+      emptyRecords.rdd,
       new JSONOptions(Map.empty[String, String], "GMT"),
       CreateJacksonParser.string)
     assert(StructType(Seq()) === emptySchema)
@@ -1592,7 +1592,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
       val dir = Utils.createTempDir()
       dir.delete()
       val path = dir.getCanonicalPath
-      arrayAndStructRecords.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)
+      arrayAndStructRecords.map(record => record.replaceAll("\n", " ")).write.text(path)
 
       val schema =
         StructType(
@@ -1609,7 +1609,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
       val dir = Utils.createTempDir()
       dir.delete()
       val path = dir.getCanonicalPath
-      primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)
+      primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).write.text(path)
 
       val jsonDF = spark.read.json(path)
       val jsonDir = new File(dir, "json").getCanonicalPath
@@ -1645,7 +1645,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
       dir.delete()
 
       val path = dir.getCanonicalPath
-      primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)
+      primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).write.text(path)
 
       val jsonDF = spark.read.json(path)
       val jsonDir = new File(dir, "json").getCanonicalPath
@@ -1693,8 +1693,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
     val json = s"""
        |{"a": [{$nested}], "b": [{$nested}]}
      """.stripMargin
-    val rdd = spark.sparkContext.makeRDD(Seq(json))
-    val df = spark.read.json(rdd)
+    val df = spark.read.json(Seq(json).toDS())
     assert(df.schema.size === 2)
     df.collect()
   }
@@ -1794,8 +1793,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
   }
 
   test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
-    val records = sparkContext
-      .parallelize("""{"a": 3, "b": 1.1}""" :: """{"a": 3.1, "b": 0.000001}""" :: Nil)
+    val records = Seq("""{"a": 3, "b": 1.1}""", """{"a": 3.1, "b": 0.000001}""").toDS()
 
     val schema = StructType(
       StructField("a", DecimalType(21, 1), true) ::

http://git-wip-us.apache.org/repos/asf/spark/blob/8a5a5850/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
index a400940..13084ba 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
@@ -17,14 +17,13 @@
 
 package org.apache.spark.sql.execution.datasources.json
 
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
 
 private[json] trait TestJsonData {
   protected def spark: SparkSession
 
-  def primitiveFieldAndType: RDD[String] =
-    spark.sparkContext.parallelize(
+  def primitiveFieldAndType: Dataset[String] =
+    spark.createDataset(spark.sparkContext.parallelize(
       """{"string":"this is a simple string.",
           "integer":10,
           "long":21474836470,
@@ -32,10 +31,10 @@ private[json] trait TestJsonData {
           "double":1.7976931348623157E308,
           "boolean":true,
           "null":null
-      }"""  :: Nil)
+      }"""  :: Nil))(Encoders.STRING)
 
-  def primitiveFieldValueTypeConflict: RDD[String] =
-    spark.sparkContext.parallelize(
+  def primitiveFieldValueTypeConflict: Dataset[String] =
+    spark.createDataset(spark.sparkContext.parallelize(
       """{"num_num_1":11, "num_num_2":null, "num_num_3": 1.1,
           "num_bool":true, "num_str":13.1, "str_bool":"str1"}""" ::
       """{"num_num_1":null, "num_num_2":21474836470.9, "num_num_3": null,
@@ -44,16 +43,17 @@ private[json] trait TestJsonData {
           "num_bool":false, "num_str":"str1", "str_bool":false}""" ::
       """{"num_num_1":21474836570, "num_num_2":1.1, "num_num_3": 21474836470,
           "num_bool":null, "num_str":92233720368547758070, "str_bool":null}""" :: Nil)
+    )(Encoders.STRING)
 
-  def jsonNullStruct: RDD[String] =
-    spark.sparkContext.parallelize(
+  def jsonNullStruct: Dataset[String] =
+    spark.createDataset(spark.sparkContext.parallelize(
       """{"nullstr":"","ip":"27.31.100.29","headers":{"Host":"1.abc.com","Charset":"UTF-8"}}""" ::
         """{"nullstr":"","ip":"27.31.100.29","headers":{}}""" ::
         """{"nullstr":"","ip":"27.31.100.29","headers":""}""" ::
-        """{"nullstr":null,"ip":"27.31.100.29","headers":null}""" :: Nil)
+        """{"nullstr":null,"ip":"27.31.100.29","headers":null}""" :: Nil))(Encoders.STRING)
 
-  def complexFieldValueTypeConflict: RDD[String] =
-    spark.sparkContext.parallelize(
+  def complexFieldValueTypeConflict: Dataset[String] =
+    spark.createDataset(spark.sparkContext.parallelize(
       """{"num_struct":11, "str_array":[1, 2, 3],
           "array":[], "struct_array":[], "struct": {}}""" ::
       """{"num_struct":{"field":false}, "str_array":null,
@@ -62,24 +62,25 @@ private[json] trait TestJsonData {
           "array":[4, 5, 6], "struct_array":[7, 8, 9], "struct": {"field":null}}""" ::
       """{"num_struct":{}, "str_array":["str1", "str2", 33],
           "array":[7], "struct_array":{"field": true}, "struct": {"field": "str"}}""" :: Nil)
+    )(Encoders.STRING)
 
-  def arrayElementTypeConflict: RDD[String] =
-    spark.sparkContext.parallelize(
+  def arrayElementTypeConflict: Dataset[String] =
+    spark.createDataset(spark.sparkContext.parallelize(
       """{"array1": [1, 1.1, true, null, [], {}, [2,3,4], {"field":"str"}],
           "array2": [{"field":214748364700}, {"field":1}]}""" ::
       """{"array3": [{"field":"str"}, {"field":1}]}""" ::
-      """{"array3": [1, 2, 3]}""" :: Nil)
+      """{"array3": [1, 2, 3]}""" :: Nil))(Encoders.STRING)
 
-  def missingFields: RDD[String] =
-    spark.sparkContext.parallelize(
+  def missingFields: Dataset[String] =
+    spark.createDataset(spark.sparkContext.parallelize(
       """{"a":true}""" ::
       """{"b":21474836470}""" ::
       """{"c":[33, 44]}""" ::
       """{"d":{"field":true}}""" ::
-      """{"e":"str"}""" :: Nil)
+      """{"e":"str"}""" :: Nil))(Encoders.STRING)
 
-  def complexFieldAndType1: RDD[String] =
-    spark.sparkContext.parallelize(
+  def complexFieldAndType1: Dataset[String] =
+    spark.createDataset(spark.sparkContext.parallelize(
       """{"struct":{"field1": true, "field2": 92233720368547758070},
           "structWithArrayFields":{"field1":[4, 5, 6], "field2":["str1", "str2"]},
           "arrayOfString":["str1", "str2"],
@@ -92,10 +93,10 @@ private[json] trait TestJsonData {
           "arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": false}, {"field3": null}],
           "arrayOfArray1":[[1, 2, 3], ["str1", "str2"]],
           "arrayOfArray2":[[1, 2, 3], [1.1, 2.1, 3.1]]
-         }"""  :: Nil)
+         }"""  :: Nil))(Encoders.STRING)
 
-  def complexFieldAndType2: RDD[String] =
-    spark.sparkContext.parallelize(
+  def complexFieldAndType2: Dataset[String] =
+    spark.createDataset(spark.sparkContext.parallelize(
       """{"arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": false}, {"field3": null}],
           "complexArrayOfStruct": [
           {
@@ -146,89 +147,90 @@ private[json] trait TestJsonData {
               {"inner3": [[{"inner4": 2}]]}
             ]
           ]]
-      }""" :: Nil)
+      }""" :: Nil))(Encoders.STRING)
 
-  def mapType1: RDD[String] =
-    spark.sparkContext.parallelize(
+  def mapType1: Dataset[String] =
+    spark.createDataset(spark.sparkContext.parallelize(
       """{"map": {"a": 1}}""" ::
       """{"map": {"b": 2}}""" ::
       """{"map": {"c": 3}}""" ::
       """{"map": {"c": 1, "d": 4}}""" ::
-      """{"map": {"e": null}}""" :: Nil)
+      """{"map": {"e": null}}""" :: Nil))(Encoders.STRING)
 
-  def mapType2: RDD[String] =
-    spark.sparkContext.parallelize(
+  def mapType2: Dataset[String] =
+    spark.createDataset(spark.sparkContext.parallelize(
       """{"map": {"a": {"field1": [1, 2, 3, null]}}}""" ::
       """{"map": {"b": {"field2": 2}}}""" ::
       """{"map": {"c": {"field1": [], "field2": 4}}}""" ::
       """{"map": {"c": {"field2": 3}, "d": {"field1": [null]}}}""" ::
       """{"map": {"e": null}}""" ::
-      """{"map": {"f": {"field1": null}}}""" :: Nil)
+      """{"map": {"f": {"field1": null}}}""" :: Nil))(Encoders.STRING)
 
-  def nullsInArrays: RDD[String] =
-    spark.sparkContext.parallelize(
+  def nullsInArrays: Dataset[String] =
+    spark.createDataset(spark.sparkContext.parallelize(
       """{"field1":[[null], [[["Test"]]]]}""" ::
       """{"field2":[null, [{"Test":1}]]}""" ::
       """{"field3":[[null], [{"Test":"2"}]]}""" ::
-      """{"field4":[[null, [1,2,3]]]}""" :: Nil)
+      """{"field4":[[null, [1,2,3]]]}""" :: Nil))(Encoders.STRING)
 
-  def jsonArray: RDD[String] =
-    spark.sparkContext.parallelize(
+  def jsonArray: Dataset[String] =
+    spark.createDataset(spark.sparkContext.parallelize(
       """[{"a":"str_a_1"}]""" ::
       """[{"a":"str_a_2"}, {"b":"str_b_3"}]""" ::
       """{"b":"str_b_4", "a":"str_a_4", "c":"str_c_4"}""" ::
-      """[]""" :: Nil)
+      """[]""" :: Nil))(Encoders.STRING)
 
-  def corruptRecords: RDD[String] =
-    spark.sparkContext.parallelize(
+  def corruptRecords: Dataset[String] =
+    spark.createDataset(spark.sparkContext.parallelize(
       """{""" ::
       """""" ::
       """{"a":1, b:2}""" ::
       """{"a":{, b:3}""" ::
       """{"b":"str_b_4", "a":"str_a_4", "c":"str_c_4"}""" ::
-      """]""" :: Nil)
+      """]""" :: Nil))(Encoders.STRING)
 
-  def additionalCorruptRecords: RDD[String] =
-    spark.sparkContext.parallelize(
+  def additionalCorruptRecords: Dataset[String] =
+    spark.createDataset(spark.sparkContext.parallelize(
       """{"dummy":"test"}""" ::
       """[1,2,3]""" ::
       """":"test", "a":1}""" ::
       """42""" ::
-      """     ","ian":"test"}""" :: Nil)
+      """     ","ian":"test"}""" :: Nil))(Encoders.STRING)
 
-  def emptyRecords: RDD[String] =
-    spark.sparkContext.parallelize(
+  def emptyRecords: Dataset[String] =
+    spark.createDataset(spark.sparkContext.parallelize(
       """{""" ::
         """""" ::
         """{"a": {}}""" ::
         """{"a": {"b": {}}}""" ::
         """{"b": [{"c": {}}]}""" ::
-        """]""" :: Nil)
+        """]""" :: Nil))(Encoders.STRING)
 
-  def timestampAsLong: RDD[String] =
-    spark.sparkContext.parallelize(
-      """{"ts":1451732645}""" :: Nil)
+  def timestampAsLong: Dataset[String] =
+    spark.createDataset(spark.sparkContext.parallelize(
+      """{"ts":1451732645}""" :: Nil))(Encoders.STRING)
 
-  def arrayAndStructRecords: RDD[String] =
-    spark.sparkContext.parallelize(
+  def arrayAndStructRecords: Dataset[String] =
+    spark.createDataset(spark.sparkContext.parallelize(
       """{"a": {"b": 1}}""" ::
-      """{"a": []}""" :: Nil)
+      """{"a": []}""" :: Nil))(Encoders.STRING)
 
-  def floatingValueRecords: RDD[String] =
-    spark.sparkContext.parallelize(
-      s"""{"a": 0.${"0" * 38}1, "b": 0.01}""" :: Nil)
+  def floatingValueRecords: Dataset[String] =
+    spark.createDataset(spark.sparkContext.parallelize(
+      s"""{"a": 0.${"0" * 38}1, "b": 0.01}""" :: Nil))(Encoders.STRING)
 
-  def bigIntegerRecords: RDD[String] =
-    spark.sparkContext.parallelize(
-      s"""{"a": 1${"0" * 38}, "b": 92233720368547758070}""" :: Nil)
+  def bigIntegerRecords: Dataset[String] =
+    spark.createDataset(spark.sparkContext.parallelize(
+      s"""{"a": 1${"0" * 38}, "b": 92233720368547758070}""" :: Nil))(Encoders.STRING)
 
-  def datesRecords: RDD[String] =
-    spark.sparkContext.parallelize(
+  def datesRecords: Dataset[String] =
+    spark.createDataset(spark.sparkContext.parallelize(
       """{"date": "26/08/2015 18:00"}""" ::
       """{"date": "27/10/2014 18:30"}""" ::
-      """{"date": "28/01/2016 20:00"}""" :: Nil)
+      """{"date": "28/01/2016 20:00"}""" :: Nil))(Encoders.STRING)
 
-  lazy val singleRow: RDD[String] = spark.sparkContext.parallelize("""{"a":123}""" :: Nil)
+  lazy val singleRow: Dataset[String] =
+    spark.createDataset(spark.sparkContext.parallelize("""{"a":123}""" :: Nil))(Encoders.STRING)
 
-  def empty: RDD[String] = spark.sparkContext.parallelize(Seq[String]())
+  def empty: Dataset[String] = spark.emptyDataset(Encoders.STRING)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8a5a5850/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
index 4a42f8e..916a01e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
@@ -33,14 +33,15 @@ class CreateTableAsSelectSuite
   extends DataSourceTest
   with SharedSQLContext
   with BeforeAndAfterEach {
+  import testImplicits._
 
   protected override lazy val sql = spark.sql _
   private var path: File = null
 
   override def beforeAll(): Unit = {
     super.beforeAll()
-    val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
-    spark.read.json(rdd).createOrReplaceTempView("jt")
+    val ds = (1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""").toDS()
+    spark.read.json(ds).createOrReplaceTempView("jt")
   }
 
   override def afterAll(): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/8a5a5850/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 4fc2f81..19835cd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -24,14 +24,16 @@ import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.util.Utils
 
 class InsertSuite extends DataSourceTest with SharedSQLContext {
+  import testImplicits._
+
   protected override lazy val sql = spark.sql _
   private var path: File = null
 
   override def beforeAll(): Unit = {
     super.beforeAll()
     path = Utils.createTempDir()
-    val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}"""))
-    spark.read.json(rdd).createOrReplaceTempView("jt")
+    val ds = (1 to 10).map(i => s"""{"a":$i, "b":"str$i"}""").toDS()
+    spark.read.json(ds).createOrReplaceTempView("jt")
     sql(
       s"""
         |CREATE TEMPORARY VIEW jsonTable (a int, b string)
@@ -129,7 +131,7 @@ class InsertSuite extends DataSourceTest with SharedSQLContext {
 
     // Writing the table to less part files.
     val rdd1 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}"""), 5)
-    spark.read.json(rdd1).createOrReplaceTempView("jt1")
+    spark.read.json(rdd1.toDS()).createOrReplaceTempView("jt1")
     sql(
       s"""
          |INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt1
@@ -141,7 +143,7 @@ class InsertSuite extends DataSourceTest with SharedSQLContext {
 
     // Writing the table to more part files.
     val rdd2 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}"""), 10)
-    spark.read.json(rdd2).createOrReplaceTempView("jt2")
+    spark.read.json(rdd1.toDS()).createOrReplaceTempView("jt2")
     sql(
       s"""
          |INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt2

http://git-wip-us.apache.org/repos/asf/spark/blob/8a5a5850/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
index b1756c2..773d34d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
@@ -28,6 +28,8 @@ import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
 class SaveLoadSuite extends DataSourceTest with SharedSQLContext with BeforeAndAfter {
+  import testImplicits._
+
   protected override lazy val sql = spark.sql _
   private var originalDefaultSource: String = null
   private var path: File = null
@@ -40,8 +42,8 @@ class SaveLoadSuite extends DataSourceTest with SharedSQLContext with BeforeAndA
     path = Utils.createTempDir()
     path.delete()
 
-    val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
-    df = spark.read.json(rdd)
+    val ds = (1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""").toDS()
+    df = spark.read.json(ds)
     df.createOrReplaceTempView("jsonTable")
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8a5a5850/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java
index f664d5a..aefc9cc 100644
--- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java
+++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java
@@ -26,7 +26,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.*;
 import org.apache.spark.sql.expressions.Window;
 import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
@@ -35,7 +34,6 @@ import org.apache.spark.sql.hive.test.TestHive$;
 import org.apache.spark.sql.hive.aggregate.MyDoubleSum;
 
 public class JavaDataFrameSuite {
-  private transient JavaSparkContext sc;
   private transient SQLContext hc;
 
   Dataset<Row> df;
@@ -50,13 +48,11 @@ public class JavaDataFrameSuite {
   @Before
   public void setUp() throws IOException {
     hc = TestHive$.MODULE$;
-    sc = new JavaSparkContext(hc.sparkContext());
-
     List<String> jsonObjects = new ArrayList<>(10);
     for (int i = 0; i < 10; i++) {
       jsonObjects.add("{\"key\":" + i + ", \"value\":\"str" + i + "\"}");
     }
-    df = hc.read().json(sc.parallelize(jsonObjects));
+    df = hc.read().json(hc.createDataset(jsonObjects, Encoders.STRING()));
     df.createOrReplaceTempView("window_table");
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8a5a5850/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
index 061c743..0b157a4 100644
--- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
+++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
@@ -31,9 +31,9 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.QueryTest$;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SQLContext;
@@ -81,8 +81,8 @@ public class JavaMetastoreDataSourcesSuite {
     for (int i = 0; i < 10; i++) {
       jsonObjects.add("{\"a\":" + i + ", \"b\":\"str" + i + "\"}");
     }
-    JavaRDD<String> rdd = sc.parallelize(jsonObjects);
-    df = sqlContext.read().json(rdd);
+    Dataset<String> ds = sqlContext.createDataset(jsonObjects, Encoders.STRING());
+    df = sqlContext.read().json(ds);
     df.createOrReplaceTempView("jsonTable");
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8a5a5850/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index e951bbe..03ea0c8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -511,9 +511,9 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
   test("create external table") {
     withTempPath { tempPath =>
       withTable("savedJsonTable", "createdJsonTable") {
-        val df = read.json(sparkContext.parallelize((1 to 10).map { i =>
+        val df = read.json((1 to 10).map { i =>
           s"""{ "a": $i, "b": "str$i" }"""
-        }))
+        }.toDS())
 
         withSQLConf(SQLConf.DEFAULT_DATA_SOURCE_NAME.key -> "not a source name") {
           df.write

http://git-wip-us.apache.org/repos/asf/spark/blob/8a5a5850/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
index cfca1d7..8a37bc3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.test.SQLTestUtils
  * A set of tests that validates support for Hive Explain command.
  */
 class HiveExplainSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
+  import testImplicits._
 
   test("show cost in explain command") {
     // Only has sizeInBytes before ANALYZE command
@@ -92,8 +93,8 @@ class HiveExplainSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
 
   test("SPARK-17409: The EXPLAIN output of CTAS only shows the analyzed plan") {
     withTempView("jt") {
-      val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}"""))
-      spark.read.json(rdd).createOrReplaceTempView("jt")
+      val ds = (1 to 10).map(i => s"""{"a":$i, "b":"str$i"}""").toDS()
+      spark.read.json(ds).createOrReplaceTempView("jt")
       val outputs = sql(
         s"""
            |EXPLAIN EXTENDED

http://git-wip-us.apache.org/repos/asf/spark/blob/8a5a5850/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
index b2f19d7..ce92fbf 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
@@ -31,15 +31,15 @@ case class Data(a: Int, B: Int, n: Nested, nestedArray: Seq[Nested])
 class HiveResolutionSuite extends HiveComparisonTest {
 
   test("SPARK-3698: case insensitive test for nested data") {
-    read.json(sparkContext.makeRDD(
-      """{"a": [{"a": {"a": 1}}]}""" :: Nil)).createOrReplaceTempView("nested")
+    read.json(Seq("""{"a": [{"a": {"a": 1}}]}""").toDS())
+      .createOrReplaceTempView("nested")
     // This should be successfully analyzed
     sql("SELECT a[0].A.A from nested").queryExecution.analyzed
   }
 
   test("SPARK-5278: check ambiguous reference to fields") {
-    read.json(sparkContext.makeRDD(
-      """{"a": [{"b": 1, "B": 2}]}""" :: Nil)).createOrReplaceTempView("nested")
+    read.json(Seq("""{"a": [{"b": 1, "B": 2}]}""").toDS())
+      .createOrReplaceTempView("nested")
 
     // there are 2 filed matching field name "b", we should report Ambiguous reference error
     val exception = intercept[AnalysisException] {

http://git-wip-us.apache.org/repos/asf/spark/blob/8a5a5850/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index faed8b5..9f61763 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -973,30 +973,30 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
   }
 
   test("SPARK-4296 Grouping field with Hive UDF as sub expression") {
-    val rdd = sparkContext.makeRDD( """{"a": "str", "b":"1", "c":"1970-01-01 00:00:00"}""" :: Nil)
-    read.json(rdd).createOrReplaceTempView("data")
+    val ds = Seq("""{"a": "str", "b":"1", "c":"1970-01-01 00:00:00"}""").toDS()
+    read.json(ds).createOrReplaceTempView("data")
     checkAnswer(
       sql("SELECT concat(a, '-', b), year(c) FROM data GROUP BY concat(a, '-', b), year(c)"),
       Row("str-1", 1970))
 
     dropTempTable("data")
 
-    read.json(rdd).createOrReplaceTempView("data")
+    read.json(ds).createOrReplaceTempView("data")
     checkAnswer(sql("SELECT year(c) + 1 FROM data GROUP BY year(c) + 1"), Row(1971))
 
     dropTempTable("data")
   }
 
   test("resolve udtf in projection #1") {
-    val rdd = sparkContext.makeRDD((1 to 5).map(i => s"""{"a":[$i, ${i + 1}]}"""))
-    read.json(rdd).createOrReplaceTempView("data")
+    val ds = (1 to 5).map(i => s"""{"a":[$i, ${i + 1}]}""").toDS()
+    read.json(ds).createOrReplaceTempView("data")
     val df = sql("SELECT explode(a) AS val FROM data")
     val col = df("val")
   }
 
   test("resolve udtf in projection #2") {
-    val rdd = sparkContext.makeRDD((1 to 2).map(i => s"""{"a":[$i, ${i + 1}]}"""))
-    read.json(rdd).createOrReplaceTempView("data")
+    val ds = (1 to 2).map(i => s"""{"a":[$i, ${i + 1}]}""").toDS()
+    read.json(ds).createOrReplaceTempView("data")
     checkAnswer(sql("SELECT explode(map(1, 1)) FROM data LIMIT 1"), Row(1, 1) :: Nil)
     checkAnswer(sql("SELECT explode(map(1, 1)) as (k1, k2) FROM data LIMIT 1"), Row(1, 1) :: Nil)
     intercept[AnalysisException] {
@@ -1010,8 +1010,8 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
 
   // TGF with non-TGF in project is allowed in Spark SQL, but not in Hive
   test("TGF with non-TGF in projection") {
-    val rdd = sparkContext.makeRDD( """{"a": "1", "b":"1"}""" :: Nil)
-    read.json(rdd).createOrReplaceTempView("data")
+    val ds = Seq("""{"a": "1", "b":"1"}""").toDS()
+    read.json(ds).createOrReplaceTempView("data")
     checkAnswer(
       sql("SELECT explode(map(a, b)) as (k1, k2), a, b FROM data"),
       Row("1", "1", "1", "1") :: Nil)
@@ -1024,8 +1024,8 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
     // is not in a valid state (cannot be executed). Because of this bug, the analysis rule of
     // PreInsertionCasts will actually start to work before ImplicitGenerate and then
     // generates an invalid query plan.
-    val rdd = sparkContext.makeRDD((1 to 5).map(i => s"""{"a":[$i, ${i + 1}]}"""))
-    read.json(rdd).createOrReplaceTempView("data")
+    val ds = (1 to 5).map(i => s"""{"a":[$i, ${i + 1}]}""").toDS()
+    read.json(ds).createOrReplaceTempView("data")
 
     withSQLConf(SQLConf.CONVERT_CTAS.key -> "false") {
       sql("CREATE TABLE explodeTest (key bigInt)")
@@ -1262,9 +1262,8 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
   }
 
   test("SPARK-9371: fix the support for special chars in column names for hive context") {
-    read.json(sparkContext.makeRDD(
-      """{"a": {"c.b": 1}, "b.$q": [{"a@!.q": 1}], "q.w": {"w.i&": [1]}}""" :: Nil))
-      .createOrReplaceTempView("t")
+    val ds = Seq("""{"a": {"c.b": 1}, "b.$q": [{"a@!.q": 1}], "q.w": {"w.i&": [1]}}""").toDS()
+    read.json(ds).createOrReplaceTempView("t")
 
     checkAnswer(sql("SELECT a.`c.b`, `b.$q`[0].`a@!.q`, `q.w`.`w.i&`[0] FROM t"), Row(1, 1, 1))
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org