You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2018/07/23 07:27:39 UTC

spark git commit: [SPARK-24883][SQL] Avro: remove implicit class AvroDataFrameWriter/AvroDataFrameReader

Repository: spark
Updated Branches:
  refs/heads/master 8817c68f5 -> f59de52a2


[SPARK-24883][SQL] Avro: remove implicit class AvroDataFrameWriter/AvroDataFrameReader

## What changes were proposed in this pull request?

As per Reynold's comment: https://github.com/apache/spark/pull/21742#discussion_r203496489

It makes sense to remove the implicit class AvroDataFrameWriter/AvroDataFrameReader, since the Avro package is external module.

## How was this patch tested?

Unit test

Author: Gengliang Wang <ge...@databricks.com>

Closes #21841 from gengliangwang/removeImplicit.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f59de52a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f59de52a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f59de52a

Branch: refs/heads/master
Commit: f59de52a2a2fc8b8c596230b76f5fd2aa9fedd58
Parents: 8817c68
Author: Gengliang Wang <ge...@databricks.com>
Authored: Mon Jul 23 15:27:33 2018 +0800
Committer: hyukjinkwon <gu...@apache.org>
Committed: Mon Jul 23 15:27:33 2018 +0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/avro/package.scala     |  21 ---
 .../org/apache/spark/sql/avro/AvroSuite.scala   | 185 ++++++++++---------
 2 files changed, 96 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f59de52a/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala
index e82651d..97f9427 100755
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala
@@ -17,31 +17,10 @@
 
 package org.apache.spark.sql
 
-import org.apache.avro.Schema
-
 import org.apache.spark.annotation.Experimental
 
 package object avro {
   /**
-   * Adds a method, `avro`, to DataFrameWriter that allows you to write avro files using
-   * the DataFileWriter
-   */
-  implicit class AvroDataFrameWriter[T](writer: DataFrameWriter[T]) {
-    def avro: String => Unit = writer.format("avro").save
-  }
-
-  /**
-   * Adds a method, `avro`, to DataFrameReader that allows you to read avro files using
-   * the DataFileReader
-   */
-  implicit class AvroDataFrameReader(reader: DataFrameReader) {
-    def avro: String => DataFrame = reader.format("avro").load
-
-    @scala.annotation.varargs
-    def avro(sources: String*): DataFrame = reader.format("avro").load(sources: _*)
-  }
-
-  /**
    * Converts a binary column of avro format into its corresponding catalyst value. The specified
    * schema must match the read data, otherwise the behavior is undefined: it may fail or return
    * arbitrary result.

http://git-wip-us.apache.org/repos/asf/spark/blob/f59de52a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index dad56aa..ec1627a 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -46,24 +46,24 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
   }
 
   def checkReloadMatchesSaved(originalFile: String, newFile: String): Unit = {
-    val originalEntries = spark.read.avro(testAvro).collect()
-    val newEntries = spark.read.avro(newFile)
+    val originalEntries = spark.read.format("avro").load(testAvro).collect()
+    val newEntries = spark.read.format("avro").load(newFile)
     checkAnswer(newEntries, originalEntries)
   }
 
   test("reading from multiple paths") {
-    val df = spark.read.avro(episodesAvro, episodesAvro)
+    val df = spark.read.format("avro").load(episodesAvro, episodesAvro)
     assert(df.count == 16)
   }
 
   test("reading and writing partitioned data") {
-    val df = spark.read.avro(episodesAvro)
+    val df = spark.read.format("avro").load(episodesAvro)
     val fields = List("title", "air_date", "doctor")
     for (field <- fields) {
       withTempPath { dir =>
         val outputDir = s"$dir/${UUID.randomUUID}"
-        df.write.partitionBy(field).avro(outputDir)
-        val input = spark.read.avro(outputDir)
+        df.write.partitionBy(field).format("avro").save(outputDir)
+        val input = spark.read.format("avro").load(outputDir)
         // makes sure that no fields got dropped.
         // We convert Rows to Seqs in order to work around SPARK-10325
         assert(input.select(field).collect().map(_.toSeq).toSet ===
@@ -73,14 +73,14 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
   }
 
   test("request no fields") {
-    val df = spark.read.avro(episodesAvro)
+    val df = spark.read.format("avro").load(episodesAvro)
     df.createOrReplaceTempView("avro_table")
     assert(spark.sql("select count(*) from avro_table").collect().head === Row(8))
   }
 
   test("convert formats") {
     withTempPath { dir =>
-      val df = spark.read.avro(episodesAvro)
+      val df = spark.read.format("avro").load(episodesAvro)
       df.write.parquet(dir.getCanonicalPath)
       assert(spark.read.parquet(dir.getCanonicalPath).count() === df.count)
     }
@@ -88,8 +88,8 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
 
   test("rearrange internal schema") {
     withTempPath { dir =>
-      val df = spark.read.avro(episodesAvro)
-      df.select("doctor", "title").write.avro(dir.getCanonicalPath)
+      val df = spark.read.format("avro").load(episodesAvro)
+      df.select("doctor", "title").write.format("avro").save(dir.getCanonicalPath)
     }
   }
 
@@ -109,7 +109,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
       dataFileWriter.close()
 
       intercept[IncompatibleSchemaException] {
-        spark.read.avro(s"$dir.avro")
+        spark.read.format("avro").load(s"$dir.avro")
       }
     }
   }
@@ -136,7 +136,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
       dataFileWriter.append(rec2)
       dataFileWriter.flush()
       dataFileWriter.close()
-      val df = spark.read.avro(s"$dir.avro")
+      val df = spark.read.format("avro").load(s"$dir.avro")
       assert(df.schema.fields === Seq(StructField("field1", LongType, nullable = true)))
       assert(df.collect().toSet == Set(Row(1L), Row(2L)))
     }
@@ -164,7 +164,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
       dataFileWriter.append(rec2)
       dataFileWriter.flush()
       dataFileWriter.close()
-      val df = spark.read.avro(s"$dir.avro")
+      val df = spark.read.format("avro").load(s"$dir.avro")
       assert(df.schema.fields === Seq(StructField("field1", DoubleType, nullable = true)))
       assert(df.collect().toSet == Set(Row(1.toDouble), Row(2.toDouble)))
     }
@@ -196,7 +196,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
       dataFileWriter.append(rec2)
       dataFileWriter.flush()
       dataFileWriter.close()
-      val df = spark.read.avro(s"$dir.avro")
+      val df = spark.read.format("avro").load(s"$dir.avro")
       assert(df.schema.fields === Seq(StructField("field1", DoubleType, nullable = true)))
       assert(df.collect().toSet == Set(Row(1.toDouble), Row(null)))
     }
@@ -220,7 +220,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
       dataFileWriter.flush()
       dataFileWriter.close()
 
-      val df = spark.read.avro(s"$dir.avro")
+      val df = spark.read.format("avro").load(s"$dir.avro")
       assert(df.first() == Row(8))
     }
   }
@@ -255,7 +255,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
       dataFileWriter.flush()
       dataFileWriter.close()
 
-      val df = spark.sqlContext.read.avro(s"$dir.avro")
+      val df = spark.sqlContext.read.format("avro").load(s"$dir.avro")
       assertResult(field1)(df.selectExpr("field1.member0").first().get(0))
       assertResult(field2)(df.selectExpr("field2.member1").first().get(0))
       assertResult(field3)(df.selectExpr("field3.member2").first().get(0))
@@ -277,8 +277,8 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
         Row(null, null, null, null, null),
         Row(null, null, null, null, null)))
       val df = spark.createDataFrame(rdd, schema)
-      df.write.avro(dir.toString)
-      assert(spark.read.avro(dir.toString).count == rdd.count)
+      df.write.format("avro").save(dir.toString)
+      assert(spark.read.format("avro").load(dir.toString).count == rdd.count)
     }
   }
 
@@ -296,8 +296,8 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
         Row(3f, 3.toShort, 3.toByte, true)
       ))
       val df = spark.createDataFrame(rdd, schema)
-      df.write.avro(dir.toString)
-      assert(spark.read.avro(dir.toString).count == rdd.count)
+      df.write.format("avro").save(dir.toString)
+      assert(spark.read.format("avro").load(dir.toString).count == rdd.count)
     }
   }
 
@@ -314,9 +314,10 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
         Row(3f, new Date(1460066400500L))
       ))
       val df = spark.createDataFrame(rdd, schema)
-      df.write.avro(dir.toString)
-      assert(spark.read.avro(dir.toString).count == rdd.count)
-      assert(spark.read.avro(dir.toString).select("date").collect().map(_(0)).toSet ==
+      df.write.format("avro").save(dir.toString)
+      assert(spark.read.format("avro").load(dir.toString).count == rdd.count)
+      assert(
+        spark.read.format("avro").load(dir.toString).select("date").collect().map(_(0)).toSet ==
         Array(null, 1451865600000L, 1459987200000L).toSet)
     }
   }
@@ -350,8 +351,8 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
           Array[Array[String]](Array[String]("CSH, tearing down the walls that divide us", "-jd")),
           Array[Row](Row("Bobby G. can't swim")))))
       val df = spark.createDataFrame(rdd, testSchema)
-      df.write.avro(dir.toString)
-      assert(spark.read.avro(dir.toString).count == rdd.count)
+      df.write.format("avro").save(dir.toString)
+      assert(spark.read.format("avro").load(dir.toString).count == rdd.count)
     }
   }
 
@@ -363,14 +364,14 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
       val deflateDir = s"$dir/deflate"
       val snappyDir = s"$dir/snappy"
 
-      val df = spark.read.avro(testAvro)
+      val df = spark.read.format("avro").load(testAvro)
       spark.conf.set(AVRO_COMPRESSION_CODEC, "uncompressed")
-      df.write.avro(uncompressDir)
+      df.write.format("avro").save(uncompressDir)
       spark.conf.set(AVRO_COMPRESSION_CODEC, "deflate")
       spark.conf.set(AVRO_DEFLATE_LEVEL, "9")
-      df.write.avro(deflateDir)
+      df.write.format("avro").save(deflateDir)
       spark.conf.set(AVRO_COMPRESSION_CODEC, "snappy")
-      df.write.avro(snappyDir)
+      df.write.format("avro").save(snappyDir)
 
       val uncompressSize = FileUtils.sizeOfDirectory(new File(uncompressDir))
       val deflateSize = FileUtils.sizeOfDirectory(new File(deflateDir))
@@ -382,49 +383,50 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
   }
 
   test("dsl test") {
-    val results = spark.read.avro(episodesAvro).select("title").collect()
+    val results = spark.read.format("avro").load(episodesAvro).select("title").collect()
     assert(results.length === 8)
   }
 
   test("support of various data types") {
     // This test uses data from test.avro. You can see the data and the schema of this file in
     // test.json and test.avsc
-    val all = spark.read.avro(testAvro).collect()
+    val all = spark.read.format("avro").load(testAvro).collect()
     assert(all.length == 3)
 
-    val str = spark.read.avro(testAvro).select("string").collect()
+    val str = spark.read.format("avro").load(testAvro).select("string").collect()
     assert(str.map(_(0)).toSet.contains("Terran is IMBA!"))
 
-    val simple_map = spark.read.avro(testAvro).select("simple_map").collect()
+    val simple_map = spark.read.format("avro").load(testAvro).select("simple_map").collect()
     assert(simple_map(0)(0).getClass.toString.contains("Map"))
     assert(simple_map.map(_(0).asInstanceOf[Map[String, Some[Int]]].size).toSet == Set(2, 0))
 
-    val union0 = spark.read.avro(testAvro).select("union_string_null").collect()
+    val union0 = spark.read.format("avro").load(testAvro).select("union_string_null").collect()
     assert(union0.map(_(0)).toSet == Set("abc", "123", null))
 
-    val union1 = spark.read.avro(testAvro).select("union_int_long_null").collect()
+    val union1 = spark.read.format("avro").load(testAvro).select("union_int_long_null").collect()
     assert(union1.map(_(0)).toSet == Set(66, 1, null))
 
-    val union2 = spark.read.avro(testAvro).select("union_float_double").collect()
+    val union2 = spark.read.format("avro").load(testAvro).select("union_float_double").collect()
     assert(
       union2
         .map(x => new java.lang.Double(x(0).toString))
         .exists(p => Math.abs(p - Math.PI) < 0.001))
 
-    val fixed = spark.read.avro(testAvro).select("fixed3").collect()
+    val fixed = spark.read.format("avro").load(testAvro).select("fixed3").collect()
     assert(fixed.map(_(0).asInstanceOf[Array[Byte]]).exists(p => p(1) == 3))
 
-    val enum = spark.read.avro(testAvro).select("enum").collect()
+    val enum = spark.read.format("avro").load(testAvro).select("enum").collect()
     assert(enum.map(_(0)).toSet == Set("SPADES", "CLUBS", "DIAMONDS"))
 
-    val record = spark.read.avro(testAvro).select("record").collect()
+    val record = spark.read.format("avro").load(testAvro).select("record").collect()
     assert(record(0)(0).getClass.toString.contains("Row"))
     assert(record.map(_(0).asInstanceOf[Row](0)).contains("TEST_STR123"))
 
-    val array_of_boolean = spark.read.avro(testAvro).select("array_of_boolean").collect()
+    val array_of_boolean =
+      spark.read.format("avro").load(testAvro).select("array_of_boolean").collect()
     assert(array_of_boolean.map(_(0).asInstanceOf[Seq[Boolean]].size).toSet == Set(3, 1, 0))
 
-    val bytes = spark.read.avro(testAvro).select("bytes").collect()
+    val bytes = spark.read.format("avro").load(testAvro).select("bytes").collect()
     assert(bytes.map(_(0).asInstanceOf[Array[Byte]].length).toSet == Set(3, 1, 0))
   }
 
@@ -444,7 +446,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
     // get the same values back.
     withTempPath { dir =>
       val avroDir = s"$dir/avro"
-      spark.read.avro(testAvro).write.avro(avroDir)
+      spark.read.format("avro").load(testAvro).write.format("avro").save(avroDir)
       checkReloadMatchesSaved(testAvro, avroDir)
     }
   }
@@ -458,7 +460,8 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
       val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)
 
       val avroDir = tempDir + "/namedAvro"
-      spark.read.avro(testAvro).write.options(parameters).avro(avroDir)
+      spark.read.format("avro").load(testAvro)
+        .write.options(parameters).format("avro").save(avroDir)
       checkReloadMatchesSaved(testAvro, avroDir)
 
       // Look at raw file and make sure has namespace info
@@ -489,22 +492,22 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
       val cityDataFrame = spark.createDataFrame(cityRDD, testSchema)
 
       val avroDir = tempDir + "/avro"
-      cityDataFrame.write.avro(avroDir)
-      assert(spark.read.avro(avroDir).collect().length == 3)
+      cityDataFrame.write.format("avro").save(avroDir)
+      assert(spark.read.format("avro").load(avroDir).collect().length == 3)
 
       // TimesStamps are converted to longs
-      val times = spark.read.avro(avroDir).select("Time").collect()
+      val times = spark.read.format("avro").load(avroDir).select("Time").collect()
       assert(times.map(_(0)).toSet == Set(666, 777, 42))
 
       // DecimalType should be converted to string
-      val decimals = spark.read.avro(avroDir).select("Decimal").collect()
+      val decimals = spark.read.format("avro").load(avroDir).select("Decimal").collect()
       assert(decimals.map(_(0)).contains("3.14"))
 
       // There should be a null entry
-      val length = spark.read.avro(avroDir).select("Length").collect()
+      val length = spark.read.format("avro").load(avroDir).select("Length").collect()
       assert(length.map(_(0)).contains(null))
 
-      val binary = spark.read.avro(avroDir).select("Binary").collect()
+      val binary = spark.read.format("avro").load(avroDir).select("Binary").collect()
       for (i <- arrayOfByte.indices) {
         assert(binary(1)(0).asInstanceOf[Array[Byte]](i) == arrayOfByte(i))
       }
@@ -523,10 +526,10 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
       val writeDs = Seq((currentDate, currentTime)).toDS
 
       val avroDir = tempDir + "/avro"
-      writeDs.write.avro(avroDir)
-      assert(spark.read.avro(avroDir).collect().length == 1)
+      writeDs.write.format("avro").save(avroDir)
+      assert(spark.read.format("avro").load(avroDir).collect().length == 1)
 
-      val readDs = spark.read.schema(schema).avro(avroDir).as[(Date, Timestamp)]
+      val readDs = spark.read.schema(schema).format("avro").load(avroDir).as[(Date, Timestamp)]
 
       assert(readDs.collect().sameElements(writeDs.collect()))
     }
@@ -534,10 +537,10 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
 
   test("support of globbed paths") {
     val resourceDir = testFile(".")
-    val e1 = spark.read.avro(resourceDir + "../*/episodes.avro").collect()
+    val e1 = spark.read.format("avro").load(resourceDir + "../*/episodes.avro").collect()
     assert(e1.length == 8)
 
-    val e2 = spark.read.avro(resourceDir + "../../*/*/episodes.avro").collect()
+    val e2 = spark.read.format("avro").load(resourceDir + "../../*/*/episodes.avro").collect()
     assert(e2.length == 8)
   }
 
@@ -555,8 +558,9 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
       val writeDs = Seq((nullDate, nullTime)).toDS
 
       val avroDir = tempDir + "/avro"
-      writeDs.write.avro(avroDir)
-      val readValues = spark.read.schema(schema).avro(avroDir).as[(Date, Timestamp)].collect
+      writeDs.write.format("avro").save(avroDir)
+      val readValues =
+        spark.read.schema(schema).format("avro").load(avroDir).as[(Date, Timestamp)].collect
 
       assert(readValues.size == 1)
       assert(readValues.head == ((nullDate, nullTime)))
@@ -579,9 +583,10 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
     val result = spark
       .read
       .option("avroSchema", avroSchema)
-      .avro(testAvro)
+      .format("avro")
+      .load(testAvro)
       .collect()
-    val expected = spark.read.avro(testAvro).select("string").collect()
+    val expected = spark.read.format("avro").load(testAvro).select("string").collect()
     assert(result.sameElements(expected))
   }
 
@@ -601,7 +606,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
     val result = spark
       .read
       .option("avroSchema", avroSchema)
-      .avro(testAvro).select("missingField").first
+      .format("avro").load(testAvro).select("missingField").first
     assert(result === Row("foo"))
   }
 
@@ -609,17 +614,17 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
 
     // Directory given has no avro files
     intercept[AnalysisException] {
-      withTempPath(dir => spark.read.avro(dir.getCanonicalPath))
+      withTempPath(dir => spark.read.format("avro").load(dir.getCanonicalPath))
     }
 
     intercept[AnalysisException] {
-      spark.read.avro("very/invalid/path/123.avro")
+      spark.read.format("avro").load("very/invalid/path/123.avro")
     }
 
     // In case of globbed path that can't be matched to anything, another exception is thrown (and
     // exception message is helpful)
     intercept[AnalysisException] {
-      spark.read.avro("*/*/*/*/*/*/*/something.avro")
+      spark.read.format("avro").load("*/*/*/*/*/*/*/something.avro")
     }
 
     intercept[FileNotFoundException] {
@@ -628,7 +633,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
         val hadoopConf = spark.sqlContext.sparkContext.hadoopConfiguration
         try {
           hadoopConf.set(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, "true")
-          spark.read.avro(dir.toString)
+          spark.read.format("avro").load(dir.toString)
         } finally {
           hadoopConf.unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty)
         }
@@ -642,7 +647,8 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
         spark
           .read
           .option("ignoreExtension", false)
-          .avro(dir.toString)
+          .format("avro")
+          .load(dir.toString)
       }
     }
   }
@@ -681,13 +687,13 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
   test("test save and load") {
     // Test if load works as expected
     withTempPath { tempDir =>
-      val df = spark.read.avro(episodesAvro)
+      val df = spark.read.format("avro").load(episodesAvro)
       assert(df.count == 8)
 
       val tempSaveDir = s"$tempDir/save/"
 
-      df.write.avro(tempSaveDir)
-      val newDf = spark.read.avro(tempSaveDir)
+      df.write.format("avro").save(tempSaveDir)
+      val newDf = spark.read.format("avro").load(tempSaveDir)
       assert(newDf.count == 8)
     }
   }
@@ -695,20 +701,18 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
   test("test load with non-Avro file") {
     // Test if load works as expected
     withTempPath { tempDir =>
-      val df = spark.read.avro(episodesAvro)
+      val df = spark.read.format("avro").load(episodesAvro)
       assert(df.count == 8)
 
       val tempSaveDir = s"$tempDir/save/"
-      df.write.avro(tempSaveDir)
+      df.write.format("avro").save(tempSaveDir)
 
       Files.createFile(new File(tempSaveDir, "non-avro").toPath)
 
       val hadoopConf = spark.sqlContext.sparkContext.hadoopConfiguration
       val count = try {
         hadoopConf.set(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, "true")
-        val newDf = spark
-          .read
-          .avro(tempSaveDir)
+        val newDf = spark.read.format("avro").load(tempSaveDir)
         newDf.count()
       } finally {
         hadoopConf.unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty)
@@ -730,10 +734,11 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
       StructField("record", StructType(Seq(StructField("value_field", StringType, false))), false),
       StructField("array_of_boolean", ArrayType(BooleanType), false),
       StructField("bytes", BinaryType, true)))
-    val withSchema = spark.read.schema(partialColumns).avro(testAvro).collect()
+    val withSchema = spark.read.schema(partialColumns).format("avro").load(testAvro).collect()
     val withOutSchema = spark
       .read
-      .avro(testAvro)
+      .format("avro")
+      .load(testAvro)
       .select("string", "simple_map", "complex_map", "union_string_null", "union_int_long_null",
         "fixed3", "fixed2", "enum", "record", "array_of_boolean", "bytes")
       .collect()
@@ -751,7 +756,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
               StructField("non_exist_field", StringType, false),
               StructField("non_exist_field2", StringType, false))),
             false)))
-    val withEmptyColumn = spark.read.schema(schema).avro(testAvro).collect()
+    val withEmptyColumn = spark.read.schema(schema).format("avro").load(testAvro).collect()
 
     assert(withEmptyColumn.forall(_ == Row(null: String, Row(null: String, null: String))))
   }
@@ -762,8 +767,8 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
       import sparkSession.implicits._
       val df = (0 to 1024 * 3).toDS.map(i => s"record${i}").toDF("records")
       val outputDir = s"$dir/${UUID.randomUUID}"
-      df.write.avro(outputDir)
-      val input = spark.read.avro(outputDir)
+      df.write.format("avro").save(outputDir)
+      val input = spark.read.format("avro").load(outputDir)
       assert(input.collect.toSet.size === 1024 * 3 + 1)
       assert(input.rdd.partitions.size > 2)
     }
@@ -780,9 +785,9 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
       // Save avro file on output folder path
       val writeDf = spark.createDataFrame(List(NestedTop(1, NestedMiddle(2, NestedBottom(3, "1")))))
       val outputFolder = s"$tempDir/duplicate_names/"
-      writeDf.write.avro(outputFolder)
+      writeDf.write.format("avro").save(outputFolder)
       // Read avro file saved on the last step
-      val readDf = spark.read.avro(outputFolder)
+      val readDf = spark.read.format("avro").load(outputFolder)
       // Check if the written DataFrame is equals than read DataFrame
       assert(readDf.collect().sameElements(writeDf.collect()))
     }
@@ -801,9 +806,9 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
         ))))
       )
       val outputFolder = s"$tempDir/duplicate_names_array/"
-      writeDf.write.avro(outputFolder)
+      writeDf.write.format("avro").save(outputFolder)
       // Read avro file saved on the last step
-      val readDf = spark.read.avro(outputFolder)
+      val readDf = spark.read.format("avro").load(outputFolder)
       // Check if the written DataFrame is equals than read DataFrame
       assert(readDf.collect().sameElements(writeDf.collect()))
     }
@@ -822,9 +827,9 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
         ))))
       )
       val outputFolder = s"$tempDir/duplicate_names_map/"
-      writeDf.write.avro(outputFolder)
+      writeDf.write.format("avro").save(outputFolder)
       // Read avro file saved on the last step
-      val readDf = spark.read.avro(outputFolder)
+      val readDf = spark.read.format("avro").load(outputFolder)
       // Check if the written DataFrame is equals than read DataFrame
       assert(readDf.collect().sameElements(writeDf.collect()))
     }
@@ -837,32 +842,33 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
         Paths.get(dir.getCanonicalPath, "episodes"))
 
       val fileWithoutExtension = s"${dir.getCanonicalPath}/episodes"
-      val df1 = spark.read.avro(fileWithoutExtension)
+      val df1 = spark.read.format("avro").load(fileWithoutExtension)
       assert(df1.count == 8)
 
       val schema = new StructType()
         .add("title", StringType)
         .add("air_date", StringType)
         .add("doctor", IntegerType)
-      val df2 = spark.read.schema(schema).avro(fileWithoutExtension)
+      val df2 = spark.read.schema(schema).format("avro").load(fileWithoutExtension)
       assert(df2.count == 8)
     }
   }
 
   test("SPARK-24836: checking the ignoreExtension option") {
     withTempPath { tempDir =>
-      val df = spark.read.avro(episodesAvro)
+      val df = spark.read.format("avro").load(episodesAvro)
       assert(df.count == 8)
 
       val tempSaveDir = s"$tempDir/save/"
-      df.write.avro(tempSaveDir)
+      df.write.format("avro").save(tempSaveDir)
 
       Files.createFile(new File(tempSaveDir, "non-avro").toPath)
 
       val newDf = spark
         .read
         .option("ignoreExtension", false)
-        .avro(tempSaveDir)
+        .format("avro")
+        .load(tempSaveDir)
 
       assert(newDf.count == 8)
     }
@@ -880,7 +886,8 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
         val newDf = spark
           .read
           .option("ignoreExtension", "true")
-          .avro(s"${dir.getCanonicalPath}/episodes")
+          .format("avro")
+          .load(s"${dir.getCanonicalPath}/episodes")
         newDf.count()
       } finally {
         hadoopConf.unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org