You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by me...@apache.org on 2019/04/29 23:25:11 UTC

[spark] branch master updated: [SPARK-27588] Binary file data source fails fast and doesn't attempt to read very large files

This is an automated email from the ASF dual-hosted git repository.

meng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 618d6bf  [SPARK-27588] Binary file data source fails fast and doesn't attempt to read very large files
618d6bf is described below

commit 618d6bff71073c8c93501ab7392c3cc579730f0b
Author: Xiangrui Meng <me...@databricks.com>
AuthorDate: Mon Apr 29 16:24:49 2019 -0700

    [SPARK-27588] Binary file data source fails fast and doesn't attempt to read very large files
    
    ## What changes were proposed in this pull request?
    
    If a file is too big (>2GB), we should fail fast and do not try to read the file.
    
    ## How was this patch tested?
    
    (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
    (If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
    
    Please review http://spark.apache.org/contributing.html before opening a pull request.
    
    Closes #24483 from mengxr/SPARK-27588.
    
    Authored-by: Xiangrui Meng <me...@databricks.com>
    Signed-off-by: Xiangrui Meng <me...@databricks.com>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    |  8 ++++++
 .../datasources/binaryfile/BinaryFileFormat.scala  |  8 ++++++
 .../binaryfile/BinaryFileFormatSuite.scala         | 31 +++++++++++++++++++++-
 3 files changed, 46 insertions(+), 1 deletion(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 96d3f5c..87bce1f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1744,6 +1744,14 @@ object SQLConf {
          "and from_utc_timestamp() functions.")
     .booleanConf
     .createWithDefault(false)
+
+  val SOURCES_BINARY_FILE_MAX_LENGTH = buildConf("spark.sql.sources.binaryFile.maxLength")
+    .doc("The max length of a file that can be read by the binary file data source. " +
+      "Spark will fail fast and not attempt to read the file if its length exceeds this value. " +
+      "The theoretical max is Int.MaxValue, though VMs might implement a smaller max.")
+    .internal()
+    .intConf
+    .createWithDefault(Int.MaxValue)
 }
 
 /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
index db93268..2637784 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
@@ -24,11 +24,13 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, GlobFilter, Path}
 import org.apache.hadoop.mapreduce.Job
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
 import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory, PartitionedFile}
+import org.apache.spark.sql.internal.SQLConf.SOURCES_BINARY_FILE_MAX_LENGTH
 import org.apache.spark.sql.sources.{And, DataSourceRegister, EqualTo, Filter, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Not, Or}
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
@@ -99,6 +101,7 @@ class BinaryFileFormat extends FileFormat with DataSourceRegister {
     val binaryFileSourceOptions = new BinaryFileSourceOptions(options)
     val pathGlobPattern = binaryFileSourceOptions.pathGlobFilter
     val filterFuncs = filters.map(filter => createFilterFunction(filter))
+    val maxLength = sparkSession.conf.get(SOURCES_BINARY_FILE_MAX_LENGTH)
 
     file: PartitionedFile => {
       val path = new Path(file.filePath)
@@ -115,6 +118,11 @@ class BinaryFileFormat extends FileFormat with DataSourceRegister {
             case (MODIFICATION_TIME, i) =>
               writer.write(i, DateTimeUtils.fromMillis(status.getModificationTime))
             case (CONTENT, i) =>
+              if (status.getLen > maxLength) {
+                throw new SparkException(
+                  s"The length of ${status.getPath} is ${status.getLen}, " +
+                    s"which exceeds the max length allowed: ${maxLength}.")
+              }
               val stream = fs.open(status.getPath)
               try {
                 writer.write(i, ByteStreams.toByteArray(stream))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
index fb83c3c..01dc96c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
@@ -27,10 +27,12 @@ import com.google.common.io.{ByteStreams, Closeables}
 import org.apache.hadoop.fs.{FileStatus, FileSystem, GlobFilter, Path}
 import org.mockito.Mockito.{mock, when}
 
-import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{DataFrame, QueryTest, Row}
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.execution.datasources.PartitionedFile
 import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.internal.SQLConf.SOURCES_BINARY_FILE_MAX_LENGTH
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
 import org.apache.spark.sql.types._
@@ -339,4 +341,31 @@ class BinaryFileFormatSuite extends QueryTest with SharedSQLContext with SQLTest
     assert(df.select("LENGTH").first().getLong(0) === content.length,
       "column pruning should be case insensitive")
   }
+
+  test("fail fast and do not attempt to read if a file is too big") {
+    assert(spark.conf.get(SOURCES_BINARY_FILE_MAX_LENGTH) === Int.MaxValue)
+    withTempPath { file =>
+      val path = file.getPath
+      val content = "123".getBytes
+      Files.write(file.toPath, content, StandardOpenOption.CREATE, StandardOpenOption.WRITE)
+      def readContent(): DataFrame = {
+        spark.read.format(BINARY_FILE)
+          .load(path)
+          .select(CONTENT)
+      }
+      val expected = Seq(Row(content))
+      QueryTest.checkAnswer(readContent(), expected)
+      withSQLConf(SOURCES_BINARY_FILE_MAX_LENGTH.key -> content.length.toString) {
+        QueryTest.checkAnswer(readContent(), expected)
+      }
+      // Disable read. If the implementation attempts to read, the exception would be different.
+      file.setReadable(false)
+      val caught = intercept[SparkException] {
+        withSQLConf(SOURCES_BINARY_FILE_MAX_LENGTH.key -> (content.length - 1).toString) {
+          QueryTest.checkAnswer(readContent(), expected)
+        }
+      }
+      assert(caught.getMessage.contains("exceeds the max length allowed"))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org