You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2022/01/15 15:03:01 UTC
[spark] branch master updated: [SPARK-37876][CORE][SQL] Move `SpecificParquetRecordReaderBase.listDirectory` to `TestUtils`
This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7614472 [SPARK-37876][CORE][SQL] Move `SpecificParquetRecordReaderBase.listDirectory` to `TestUtils`
7614472 is described below
commit 7614472950cb57ffefa0a51dd1163103c5d42df6
Author: yangjie01 <ya...@baidu.com>
AuthorDate: Sat Jan 15 09:01:55 2022 -0600
[SPARK-37876][CORE][SQL] Move `SpecificParquetRecordReaderBase.listDirectory` to `TestUtils`
### What changes were proposed in this pull request?
`SpecificParquetRecordReaderBase.listDirectory` is used to return the list of files at `path` recursively and the result will skips files that are ignored normally by MapReduce.
This method is only used by tests in Spark now and the tests also includes non-parquet test scenario, such as `OrcColumnarBatchReaderSuite`.
So this pr move this method from `SpecificParquetRecordReaderBase` to `TestUtils` to make it as a test method.
### Why are the changes needed?
Refactoring: move test method to `TestUtils`.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GA
Closes #35177 from LuciferYang/list-directory.
Authored-by: yangjie01 <ya...@baidu.com>
Signed-off-by: Sean Owen <sr...@gmail.com>
---
.../src/main/scala/org/apache/spark/TestUtils.scala | 15 +++++++++++++++
.../parquet/SpecificParquetRecordReaderBase.java | 21 ---------------------
.../benchmark/DataSourceReadBenchmark.scala | 11 ++++++-----
.../orc/OrcColumnarBatchReaderSuite.scala | 4 ++--
.../datasources/parquet/ParquetEncodingSuite.scala | 11 ++++++-----
.../datasources/parquet/ParquetIOSuite.scala | 6 +++---
.../execution/datasources/parquet/ParquetTest.scala | 3 ++-
.../spark/sql/test/DataFrameReaderWriterSuite.scala | 5 ++---
8 files changed, 36 insertions(+), 40 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala
index d2af955..505b3ab 100644
--- a/core/src/main/scala/org/apache/spark/TestUtils.scala
+++ b/core/src/main/scala/org/apache/spark/TestUtils.scala
@@ -446,6 +446,21 @@ private[spark] object TestUtils {
current ++ current.filter(_.isDirectory).flatMap(recursiveList)
}
+ /**
+ * Returns the list of files at 'path' recursively. This skips files that are ignored normally
+ * by MapReduce.
+ */
+ def listDirectory(path: File): Array[String] = {
+ val result = ArrayBuffer.empty[String]
+ if (path.isDirectory) {
+ path.listFiles.foreach(f => result.appendAll(listDirectory(f)))
+ } else {
+ val c = path.getName.charAt(0)
+ if (c != '.' && c != '_') result.append(path.getAbsolutePath)
+ }
+ result.toArray
+ }
+
/** Creates a temp JSON file that contains the input JSON record. */
def createTempJsonFile(dir: File, prefix: String, jsonValue: JValue): String = {
val file = File.createTempFile(prefix, ".json", dir)
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index e1a0607..07e35c1 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -19,10 +19,8 @@
package org.apache.spark.sql.execution.datasources.parquet;
import java.io.Closeable;
-import java.io.File;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -122,25 +120,6 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
}
/**
- * Returns the list of files at 'path' recursively. This skips files that are ignored normally
- * by MapReduce.
- */
- public static List<String> listDirectory(File path) {
- List<String> result = new ArrayList<>();
- if (path.isDirectory()) {
- for (File f: path.listFiles()) {
- result.addAll(listDirectory(f));
- }
- } else {
- char c = path.getName().charAt(0);
- if (c != '.' && c != '_') {
- result.add(path.getAbsolutePath());
- }
- }
- return result;
- }
-
- /**
* Initializes the reader to read the file at `path` with `columns` projected. If columns is
* null, all the columns are projected.
*
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
index 31cee48..5094cdf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
@@ -25,10 +25,11 @@ import org.apache.parquet.column.ParquetProperties
import org.apache.parquet.hadoop.ParquetOutputFormat
import org.apache.spark.SparkConf
+import org.apache.spark.TestUtils
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.{DataFrame, DataFrameWriter, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.datasources.parquet.{SpecificParquetRecordReaderBase, VectorizedParquetRecordReader}
+import org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnVector
@@ -167,7 +168,7 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark {
sqlBenchmark.run()
// Driving the parquet reader in batch mode directly.
- val files = SpecificParquetRecordReaderBase.listDirectory(new File(dir, "parquet")).toArray
+ val files = TestUtils.listDirectory(new File(dir, "parquet"))
val enableOffHeapColumnVector = spark.sessionState.conf.offHeapColumnVectorEnabled
val vectorizedReaderBatchSize = spark.sessionState.conf.parquetVectorizedReaderBatchSize
parquetReaderBenchmark.addCase("ParquetReader Vectorized") { _ =>
@@ -183,7 +184,7 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark {
case DoubleType => (col: ColumnVector, i: Int) => doubleSum += col.getDouble(i)
}
- files.map(_.asInstanceOf[String]).foreach { p =>
+ files.foreach { p =>
val reader = new VectorizedParquetRecordReader(
enableOffHeapColumnVector, vectorizedReaderBatchSize)
try {
@@ -468,12 +469,12 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark {
}
}
- val files = SpecificParquetRecordReaderBase.listDirectory(new File(dir, "parquet")).toArray
+ val files = TestUtils.listDirectory(new File(dir, "parquet"))
val enableOffHeapColumnVector = spark.sessionState.conf.offHeapColumnVectorEnabled
val vectorizedReaderBatchSize = spark.sessionState.conf.parquetVectorizedReaderBatchSize
benchmark.addCase("ParquetReader Vectorized") { num =>
var sum = 0
- files.map(_.asInstanceOf[String]).foreach { p =>
+ files.foreach { p =>
val reader = new VectorizedParquetRecordReader(
enableOffHeapColumnVector, vectorizedReaderBatchSize)
try {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala
index bfcef46..4ff9612 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala
@@ -25,11 +25,11 @@ import org.apache.hadoop.mapreduce.lib.input.FileSplit
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.orc.TypeDescription
+import org.apache.spark.TestUtils
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase
import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
@@ -117,7 +117,7 @@ class OrcColumnarBatchReaderSuite extends QueryTest with SharedSparkSession {
dataTypes.zip(constantValues).foreach { case (dt, v) =>
val schema = StructType(StructField("col1", IntegerType) :: StructField("pcol", dt) :: Nil)
val partitionValues = new GenericInternalRow(Array(v))
- val file = new File(SpecificParquetRecordReaderBase.listDirectory(dir).get(0))
+ val file = new File(TestUtils.listDirectory(dir).head)
val fileSplit = new FileSplit(new Path(file.getCanonicalPath), 0L, file.length, Array.empty)
val taskConf = sqlContext.sessionState.newHadoopConf()
val orcFileSchema = TypeDescription.fromString(schema.simpleString)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
index 746d9c6..f7100a5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path
import org.apache.parquet.column.{Encoding, ParquetProperties}
import org.apache.parquet.hadoop.ParquetOutputFormat
+import org.apache.spark.TestUtils
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.internal.SQLConf
@@ -50,12 +51,12 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSparkSess
(1 :: 1000 :: Nil).foreach { n => {
withTempPath { dir =>
List.fill(n)(ROW).toDF.repartition(1).write.parquet(dir.getCanonicalPath)
- val file = SpecificParquetRecordReaderBase.listDirectory(dir).toArray.head
+ val file = TestUtils.listDirectory(dir).head
val conf = sqlContext.conf
val reader = new VectorizedParquetRecordReader(
conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
- reader.initialize(file.asInstanceOf[String], null)
+ reader.initialize(file, null)
val batch = reader.resultBatch()
assert(reader.nextBatch())
assert(batch.numRows() == n)
@@ -80,12 +81,12 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSparkSess
withTempPath { dir =>
val data = List.fill(n)(NULL_ROW).toDF
data.repartition(1).write.parquet(dir.getCanonicalPath)
- val file = SpecificParquetRecordReaderBase.listDirectory(dir).toArray.head
+ val file = TestUtils.listDirectory(dir).head
val conf = sqlContext.conf
val reader = new VectorizedParquetRecordReader(
conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
- reader.initialize(file.asInstanceOf[String], null)
+ reader.initialize(file, null)
val batch = reader.resultBatch()
assert(reader.nextBatch())
assert(batch.numRows() == n)
@@ -114,7 +115,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSparkSess
// first page is dictionary encoded and the remaining two are plain encoded.
val data = (0 until 512).flatMap(i => Seq.fill(3)(i.toString))
data.toDF("f").coalesce(1).write.parquet(dir.getCanonicalPath)
- val file = SpecificParquetRecordReaderBase.listDirectory(dir).asScala.head
+ val file = TestUtils.listDirectory(dir).head
val conf = sqlContext.conf
val reader = new VectorizedParquetRecordReader(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 0966319..1e2bb91 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -38,7 +38,7 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP
import org.apache.parquet.schema.{MessageType, MessageTypeParser}
-import org.apache.spark.{SPARK_VERSION_SHORT, SparkException}
+import org.apache.spark.{SPARK_VERSION_SHORT, SparkException, TestUtils}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection}
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow}
@@ -928,7 +928,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
val data = (0 to 10).map(i => (i, (i + 'a').toChar.toString))
withTempPath { dir =>
spark.createDataFrame(data).repartition(1).write.parquet(dir.getCanonicalPath)
- val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0);
+ val file = TestUtils.listDirectory(dir).head;
{
val conf = sqlContext.conf
val reader = new VectorizedParquetRecordReader(
@@ -1032,7 +1032,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
val vectorizedReader = new VectorizedParquetRecordReader(
conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize)
val partitionValues = new GenericInternalRow(Array(v))
- val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0)
+ val file = TestUtils.listDirectory(dir).head
try {
vectorizedReader.initialize(file, null)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
index 4772316..7a7957c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
@@ -33,6 +33,7 @@ import org.apache.parquet.hadoop.metadata.{BlockMetaData, FileMetaData, ParquetM
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.parquet.schema.MessageType
+import org.apache.spark.TestUtils
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.execution.datasources.FileBasedDataSourceTest
import org.apache.spark.sql.internal.SQLConf
@@ -179,7 +180,7 @@ private[sql] trait ParquetTest extends FileBasedDataSourceTest {
}
def getMetaData(dir: java.io.File): Map[String, String] = {
- val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0)
+ val file = TestUtils.listDirectory(dir).head
val conf = new Configuration()
val hadoopInputFile = HadoopInputFile.fromPath(new Path(file), conf)
val parquetReadOptions = HadoopReadOptions.builder(conf).build()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index ea007c1..cb3bd29 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -32,7 +32,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.parquet.schema.Type.Repetition
import org.scalatest.BeforeAndAfter
-import org.apache.spark.SparkContext
+import org.apache.spark.{SparkContext, TestUtils}
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
@@ -42,7 +42,6 @@ import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, Ove
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.datasources.{DataSourceUtils, HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.datasources.noop.NoopDataSource
-import org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
@@ -764,7 +763,7 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with
withTempPath { dir =>
val path = dir.getAbsolutePath
df.write.mode("overwrite").parquet(path)
- val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0)
+ val file = TestUtils.listDirectory(dir).head
val hadoopInputFile = HadoopInputFile.fromPath(new Path(file), new Configuration())
val f = ParquetFileReader.open(hadoopInputFile)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org