You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by me...@apache.org on 2019/04/29 23:25:11 UTC
[spark] branch master updated: [SPARK-27588] Binary file data
source fails fast and doesn't attempt to read very large files
This is an automated email from the ASF dual-hosted git repository.
meng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 618d6bf [SPARK-27588] Binary file data source fails fast and doesn't attempt to read very large files
618d6bf is described below
commit 618d6bff71073c8c93501ab7392c3cc579730f0b
Author: Xiangrui Meng <me...@databricks.com>
AuthorDate: Mon Apr 29 16:24:49 2019 -0700
[SPARK-27588] Binary file data source fails fast and doesn't attempt to read very large files
## What changes were proposed in this pull request?
If a file is too big (>2GB), we should fail fast and do not try to read the file.
## How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Please review http://spark.apache.org/contributing.html before opening a pull request.
Closes #24483 from mengxr/SPARK-27588.
Authored-by: Xiangrui Meng <me...@databricks.com>
Signed-off-by: Xiangrui Meng <me...@databricks.com>
---
.../org/apache/spark/sql/internal/SQLConf.scala | 8 ++++++
.../datasources/binaryfile/BinaryFileFormat.scala | 8 ++++++
.../binaryfile/BinaryFileFormatSuite.scala | 31 +++++++++++++++++++++-
3 files changed, 46 insertions(+), 1 deletion(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 96d3f5c..87bce1f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1744,6 +1744,14 @@ object SQLConf {
"and from_utc_timestamp() functions.")
.booleanConf
.createWithDefault(false)
+
+ val SOURCES_BINARY_FILE_MAX_LENGTH = buildConf("spark.sql.sources.binaryFile.maxLength")
+ .doc("The max length of a file that can be read by the binary file data source. " +
+ "Spark will fail fast and not attempt to read the file if its length exceeds this value. " +
+ "The theoretical max is Int.MaxValue, though VMs might implement a smaller max.")
+ .internal()
+ .intConf
+ .createWithDefault(Int.MaxValue)
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
index db93268..2637784 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
@@ -24,11 +24,13 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, GlobFilter, Path}
import org.apache.hadoop.mapreduce.Job
+import org.apache.spark.SparkException
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory, PartitionedFile}
+import org.apache.spark.sql.internal.SQLConf.SOURCES_BINARY_FILE_MAX_LENGTH
import org.apache.spark.sql.sources.{And, DataSourceRegister, EqualTo, Filter, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Not, Or}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -99,6 +101,7 @@ class BinaryFileFormat extends FileFormat with DataSourceRegister {
val binaryFileSourceOptions = new BinaryFileSourceOptions(options)
val pathGlobPattern = binaryFileSourceOptions.pathGlobFilter
val filterFuncs = filters.map(filter => createFilterFunction(filter))
+ val maxLength = sparkSession.conf.get(SOURCES_BINARY_FILE_MAX_LENGTH)
file: PartitionedFile => {
val path = new Path(file.filePath)
@@ -115,6 +118,11 @@ class BinaryFileFormat extends FileFormat with DataSourceRegister {
case (MODIFICATION_TIME, i) =>
writer.write(i, DateTimeUtils.fromMillis(status.getModificationTime))
case (CONTENT, i) =>
+ if (status.getLen > maxLength) {
+ throw new SparkException(
+ s"The length of ${status.getPath} is ${status.getLen}, " +
+ s"which exceeds the max length allowed: ${maxLength}.")
+ }
val stream = fs.open(status.getPath)
try {
writer.write(i, ByteStreams.toByteArray(stream))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
index fb83c3c..01dc96c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
@@ -27,10 +27,12 @@ import com.google.common.io.{ByteStreams, Closeables}
import org.apache.hadoop.fs.{FileStatus, FileSystem, GlobFilter, Path}
import org.mockito.Mockito.{mock, when}
-import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.internal.SQLConf.SOURCES_BINARY_FILE_MAX_LENGTH
import org.apache.spark.sql.sources._
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
import org.apache.spark.sql.types._
@@ -339,4 +341,31 @@ class BinaryFileFormatSuite extends QueryTest with SharedSQLContext with SQLTest
assert(df.select("LENGTH").first().getLong(0) === content.length,
"column pruning should be case insensitive")
}
+
+ test("fail fast and do not attempt to read if a file is too big") {
+ assert(spark.conf.get(SOURCES_BINARY_FILE_MAX_LENGTH) === Int.MaxValue)
+ withTempPath { file =>
+ val path = file.getPath
+ val content = "123".getBytes
+ Files.write(file.toPath, content, StandardOpenOption.CREATE, StandardOpenOption.WRITE)
+ def readContent(): DataFrame = {
+ spark.read.format(BINARY_FILE)
+ .load(path)
+ .select(CONTENT)
+ }
+ val expected = Seq(Row(content))
+ QueryTest.checkAnswer(readContent(), expected)
+ withSQLConf(SOURCES_BINARY_FILE_MAX_LENGTH.key -> content.length.toString) {
+ QueryTest.checkAnswer(readContent(), expected)
+ }
+ // Disable read. If the implementation attempts to read, the exception would be different.
+ file.setReadable(false)
+ val caught = intercept[SparkException] {
+ withSQLConf(SOURCES_BINARY_FILE_MAX_LENGTH.key -> (content.length - 1).toString) {
+ QueryTest.checkAnswer(readContent(), expected)
+ }
+ }
+ assert(caught.getMessage.contains("exceeds the max length allowed"))
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org