You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2022/01/15 15:03:01 UTC

[spark] branch master updated: [SPARK-37876][CORE][SQL] Move `SpecificParquetRecordReaderBase.listDirectory` to `TestUtils`

This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7614472  [SPARK-37876][CORE][SQL] Move `SpecificParquetRecordReaderBase.listDirectory` to `TestUtils`
7614472 is described below

commit 7614472950cb57ffefa0a51dd1163103c5d42df6
Author: yangjie01 <ya...@baidu.com>
AuthorDate: Sat Jan 15 09:01:55 2022 -0600

    [SPARK-37876][CORE][SQL] Move `SpecificParquetRecordReaderBase.listDirectory` to `TestUtils`
    
    ### What changes were proposed in this pull request?
    `SpecificParquetRecordReaderBase.listDirectory`  is used to return the list of files at `path` recursively and the result will skips files that are ignored normally by MapReduce.
    
    This method is only used by tests in Spark now and the tests also includes non-parquet test scenario, such as `OrcColumnarBatchReaderSuite`.
    
    So this pr move this method from `SpecificParquetRecordReaderBase` to `TestUtils` to make it as a test method.
    
    ### Why are the changes needed?
    Refactoring: move test method to `TestUtils`.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Pass GA
    
    Closes #35177 from LuciferYang/list-directory.
    
    Authored-by: yangjie01 <ya...@baidu.com>
    Signed-off-by: Sean Owen <sr...@gmail.com>
---
 .../src/main/scala/org/apache/spark/TestUtils.scala | 15 +++++++++++++++
 .../parquet/SpecificParquetRecordReaderBase.java    | 21 ---------------------
 .../benchmark/DataSourceReadBenchmark.scala         | 11 ++++++-----
 .../orc/OrcColumnarBatchReaderSuite.scala           |  4 ++--
 .../datasources/parquet/ParquetEncodingSuite.scala  | 11 ++++++-----
 .../datasources/parquet/ParquetIOSuite.scala        |  6 +++---
 .../execution/datasources/parquet/ParquetTest.scala |  3 ++-
 .../spark/sql/test/DataFrameReaderWriterSuite.scala |  5 ++---
 8 files changed, 36 insertions(+), 40 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala
index d2af955..505b3ab 100644
--- a/core/src/main/scala/org/apache/spark/TestUtils.scala
+++ b/core/src/main/scala/org/apache/spark/TestUtils.scala
@@ -446,6 +446,21 @@ private[spark] object TestUtils {
     current ++ current.filter(_.isDirectory).flatMap(recursiveList)
   }
 
+  /**
+   * Returns the list of files at 'path' recursively. This skips files that are ignored normally
+   * by MapReduce.
+   */
+  def listDirectory(path: File): Array[String] = {
+    val result = ArrayBuffer.empty[String]
+    if (path.isDirectory) {
+      path.listFiles.foreach(f => result.appendAll(listDirectory(f)))
+    } else {
+      val c = path.getName.charAt(0)
+      if (c != '.' && c != '_') result.append(path.getAbsolutePath)
+    }
+    result.toArray
+  }
+
   /** Creates a temp JSON file that contains the input JSON record. */
   def createTempJsonFile(dir: File, prefix: String, jsonValue: JValue): String = {
     val file = File.createTempFile(prefix, ".json", dir)
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index e1a0607..07e35c1 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -19,10 +19,8 @@
 package org.apache.spark.sql.execution.datasources.parquet;
 
 import java.io.Closeable;
-import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -122,25 +120,6 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
   }
 
   /**
-   * Returns the list of files at 'path' recursively. This skips files that are ignored normally
-   * by MapReduce.
-   */
-  public static List<String> listDirectory(File path) {
-    List<String> result = new ArrayList<>();
-    if (path.isDirectory()) {
-      for (File f: path.listFiles()) {
-        result.addAll(listDirectory(f));
-      }
-    } else {
-      char c = path.getName().charAt(0);
-      if (c != '.' && c != '_') {
-        result.add(path.getAbsolutePath());
-      }
-    }
-    return result;
-  }
-
-  /**
    * Initializes the reader to read the file at `path` with `columns` projected. If columns is
    * null, all the columns are projected.
    *
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
index 31cee48..5094cdf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
@@ -25,10 +25,11 @@ import org.apache.parquet.column.ParquetProperties
 import org.apache.parquet.hadoop.ParquetOutputFormat
 
 import org.apache.spark.SparkConf
+import org.apache.spark.TestUtils
 import org.apache.spark.benchmark.Benchmark
 import org.apache.spark.sql.{DataFrame, DataFrameWriter, Row, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.datasources.parquet.{SpecificParquetRecordReaderBase, VectorizedParquetRecordReader}
+import org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.vectorized.ColumnVector
@@ -167,7 +168,7 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark {
         sqlBenchmark.run()
 
         // Driving the parquet reader in batch mode directly.
-        val files = SpecificParquetRecordReaderBase.listDirectory(new File(dir, "parquet")).toArray
+        val files = TestUtils.listDirectory(new File(dir, "parquet"))
         val enableOffHeapColumnVector = spark.sessionState.conf.offHeapColumnVectorEnabled
         val vectorizedReaderBatchSize = spark.sessionState.conf.parquetVectorizedReaderBatchSize
         parquetReaderBenchmark.addCase("ParquetReader Vectorized") { _ =>
@@ -183,7 +184,7 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark {
             case DoubleType => (col: ColumnVector, i: Int) => doubleSum += col.getDouble(i)
           }
 
-          files.map(_.asInstanceOf[String]).foreach { p =>
+          files.foreach { p =>
             val reader = new VectorizedParquetRecordReader(
               enableOffHeapColumnVector, vectorizedReaderBatchSize)
             try {
@@ -468,12 +469,12 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark {
           }
         }
 
-        val files = SpecificParquetRecordReaderBase.listDirectory(new File(dir, "parquet")).toArray
+        val files = TestUtils.listDirectory(new File(dir, "parquet"))
         val enableOffHeapColumnVector = spark.sessionState.conf.offHeapColumnVectorEnabled
         val vectorizedReaderBatchSize = spark.sessionState.conf.parquetVectorizedReaderBatchSize
         benchmark.addCase("ParquetReader Vectorized") { num =>
           var sum = 0
-          files.map(_.asInstanceOf[String]).foreach { p =>
+          files.foreach { p =>
             val reader = new VectorizedParquetRecordReader(
               enableOffHeapColumnVector, vectorizedReaderBatchSize)
             try {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala
index bfcef46..4ff9612 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala
@@ -25,11 +25,11 @@ import org.apache.hadoop.mapreduce.lib.input.FileSplit
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 import org.apache.orc.TypeDescription
 
+import org.apache.spark.TestUtils
 import org.apache.spark.sql.QueryTest
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase
 import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector}
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types._
@@ -117,7 +117,7 @@ class OrcColumnarBatchReaderSuite extends QueryTest with SharedSparkSession {
       dataTypes.zip(constantValues).foreach { case (dt, v) =>
         val schema = StructType(StructField("col1", IntegerType) :: StructField("pcol", dt) :: Nil)
         val partitionValues = new GenericInternalRow(Array(v))
-        val file = new File(SpecificParquetRecordReaderBase.listDirectory(dir).get(0))
+        val file = new File(TestUtils.listDirectory(dir).head)
         val fileSplit = new FileSplit(new Path(file.getCanonicalPath), 0L, file.length, Array.empty)
         val taskConf = sqlContext.sessionState.newHadoopConf()
         val orcFileSchema = TypeDescription.fromString(schema.simpleString)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
index 746d9c6..f7100a5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.parquet.column.{Encoding, ParquetProperties}
 import org.apache.parquet.hadoop.ParquetOutputFormat
 
+import org.apache.spark.TestUtils
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.internal.SQLConf
@@ -50,12 +51,12 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSparkSess
     (1 :: 1000 :: Nil).foreach { n => {
       withTempPath { dir =>
         List.fill(n)(ROW).toDF.repartition(1).write.parquet(dir.getCanonicalPath)
-        val file = SpecificParquetRecordReaderBase.listDirectory(dir).toArray.head
+        val file = TestUtils.listDirectory(dir).head
 
         val conf = sqlContext.conf
         val reader = new VectorizedParquetRecordReader(
           conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
-        reader.initialize(file.asInstanceOf[String], null)
+        reader.initialize(file, null)
         val batch = reader.resultBatch()
         assert(reader.nextBatch())
         assert(batch.numRows() == n)
@@ -80,12 +81,12 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSparkSess
       withTempPath { dir =>
         val data = List.fill(n)(NULL_ROW).toDF
         data.repartition(1).write.parquet(dir.getCanonicalPath)
-        val file = SpecificParquetRecordReaderBase.listDirectory(dir).toArray.head
+        val file = TestUtils.listDirectory(dir).head
 
         val conf = sqlContext.conf
         val reader = new VectorizedParquetRecordReader(
           conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
-        reader.initialize(file.asInstanceOf[String], null)
+        reader.initialize(file, null)
         val batch = reader.resultBatch()
         assert(reader.nextBatch())
         assert(batch.numRows() == n)
@@ -114,7 +115,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSparkSess
         // first page is dictionary encoded and the remaining two are plain encoded.
         val data = (0 until 512).flatMap(i => Seq.fill(3)(i.toString))
         data.toDF("f").coalesce(1).write.parquet(dir.getCanonicalPath)
-        val file = SpecificParquetRecordReaderBase.listDirectory(dir).asScala.head
+        val file = TestUtils.listDirectory(dir).head
 
         val conf = sqlContext.conf
         val reader = new VectorizedParquetRecordReader(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 0966319..1e2bb91 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -38,7 +38,7 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName
 import org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP
 import org.apache.parquet.schema.{MessageType, MessageTypeParser}
 
-import org.apache.spark.{SPARK_VERSION_SHORT, SparkException}
+import org.apache.spark.{SPARK_VERSION_SHORT, SparkException, TestUtils}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection}
 import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow}
@@ -928,7 +928,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
     val data = (0 to 10).map(i => (i, (i + 'a').toChar.toString))
     withTempPath { dir =>
       spark.createDataFrame(data).repartition(1).write.parquet(dir.getCanonicalPath)
-      val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0);
+      val file = TestUtils.listDirectory(dir).head;
       {
         val conf = sqlContext.conf
         val reader = new VectorizedParquetRecordReader(
@@ -1032,7 +1032,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
         val vectorizedReader = new VectorizedParquetRecordReader(
           conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
         val partitionValues = new GenericInternalRow(Array(v))
-        val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0)
+        val file = TestUtils.listDirectory(dir).head
 
         try {
           vectorizedReader.initialize(file, null)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
index 4772316..7a7957c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
@@ -33,6 +33,7 @@ import org.apache.parquet.hadoop.metadata.{BlockMetaData, FileMetaData, ParquetM
 import org.apache.parquet.hadoop.util.HadoopInputFile
 import org.apache.parquet.schema.MessageType
 
+import org.apache.spark.TestUtils
 import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.execution.datasources.FileBasedDataSourceTest
 import org.apache.spark.sql.internal.SQLConf
@@ -179,7 +180,7 @@ private[sql] trait ParquetTest extends FileBasedDataSourceTest {
   }
 
   def getMetaData(dir: java.io.File): Map[String, String] = {
-    val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0)
+    val file = TestUtils.listDirectory(dir).head
     val conf = new Configuration()
     val hadoopInputFile = HadoopInputFile.fromPath(new Path(file), conf)
     val parquetReadOptions = HadoopReadOptions.builder(conf).build()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index ea007c1..cb3bd29 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -32,7 +32,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
 import org.apache.parquet.schema.Type.Repetition
 import org.scalatest.BeforeAndAfter
 
-import org.apache.spark.SparkContext
+import org.apache.spark.{SparkContext, TestUtils}
 import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
 import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol
 import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
@@ -42,7 +42,6 @@ import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, Ove
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.datasources.{DataSourceUtils, HadoopFsRelation, LogicalRelation}
 import org.apache.spark.sql.execution.datasources.noop.NoopDataSource
-import org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
@@ -764,7 +763,7 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with
     withTempPath { dir =>
       val path = dir.getAbsolutePath
       df.write.mode("overwrite").parquet(path)
-      val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0)
+      val file = TestUtils.listDirectory(dir).head
 
       val hadoopInputFile = HadoopInputFile.fromPath(new Path(file), new Configuration())
       val f = ParquetFileReader.open(hadoopInputFile)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org