You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2021/02/18 14:43:44 UTC

[spark] branch master updated: [SPARK-33739][SQL] Jobs committed through the S3A Magic committer don't track bytes

This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ff5115c  [SPARK-33739][SQL] Jobs committed through the S3A Magic committer don't track bytes
ff5115c is described below

commit ff5115c3ac15cf718fbfe98e07c56f3dde79a602
Author: Steve Loughran <st...@cloudera.com>
AuthorDate: Thu Feb 18 08:43:18 2021 -0600

    [SPARK-33739][SQL] Jobs committed through the S3A Magic committer don't track bytes
    
    BasicWriteStatsTracker to probe for a custom Xattr if the size of
    the generated file is 0 bytes; if found and parseable use that as
    the declared length of the output.
    
    The matching Hadoop patch in HADOOP-17414:
    
    * Returns all S3 object headers as XAttr attributes prefixed "header."
    * Sets the custom header x-hadoop-s3a-magic-data-length to the length of
      the data in the marker file.
    
    As a result, spark job tracking will correctly report the amount of data uploaded
    and yet to materialize.
    
    ### Why are the changes needed?
    
    Now that S3 is consistent, it's a lot easier to use the S3A "magic" committer
    which redirects a file written to `dest/__magic/job_0011/task_1245/__base/year=2020/output.avro`
    to its final destination `dest/year=2020/output.avro` , adding a zero byte marker file at
    the end and a json file `dest/__magic/job_0011/task_1245/__base/year=2020/output.avro.pending`
    containing all the information for the job committer to complete the upload.
    
    But: the write tracker statictics don't show progress as they measure the length of the
    created file, find the marker file and report 0 bytes.
    By probing for a specific HTTP header in the marker file and parsing that if
    retrieved, the real progress can be reported.
    
    There's a matching change in Hadoop [https://github.com/apache/hadoop/pull/2530](https://github.com/apache/hadoop/pull/2530)
    which adds getXAttr API support to the S3A connector and returns the headers; the magic
    committer adds the relevant attributes.
    
    If the FS being probed doesn't support the XAttr API, the header is missing
    or the value not a positive long then the size of 0 is returned.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    New tests in BasicWriteTaskStatsTrackerSuite which use a filter FS to
    implement getXAttr on top of LocalFS; this is used to explore the set of
    options:
    * no XAttr API implementation (existing tests; what callers would see with
      most filesystems)
    * no attribute found (HDFS, ABFS without the attribute)
    * invalid data of different forms
    
    All of these return Some(0) as file length.
    
    The Hadoop PR verifies XAttr implementation in S3A and that
    the commit protocol attaches the header to the files.
    
    External downstream testing has done the full hadoop+spark end
    to end operation, with manual review of logs to verify that the
    data was successfully collected from the attribute.
    
    Closes #30714 from steveloughran/cdpd/SPARK-33739-magic-commit-tracking-master.
    
    Authored-by: Steve Loughran <st...@cloudera.com>
    Signed-off-by: Thomas Graves <tg...@apache.org>
---
 docs/cloud-integration.md                          |  43 +++++--
 .../datasources/BasicWriteStatsTracker.scala       |  63 +++++++++-
 .../BasicWriteTaskStatsTrackerSuite.scala          | 128 ++++++++++++++++++++-
 3 files changed, 221 insertions(+), 13 deletions(-)

diff --git a/docs/cloud-integration.md b/docs/cloud-integration.md
index 1eaa8ab..d801df7 100644
--- a/docs/cloud-integration.md
+++ b/docs/cloud-integration.md
@@ -49,7 +49,6 @@ They cannot be used as a direct replacement for a cluster filesystem such as HDF
 
 Key differences are:
 
-* Changes to stored objects may not be immediately visible, both in directory listings and actual data access.
 * The means by which directories are emulated may make working with them slow.
 * Rename operations may be very slow and, on failure, leave the store in an unknown state.
 * Seeking within a file may require new HTTP calls, hurting performance. 
@@ -58,7 +57,6 @@ How does this affect Spark?
 
 1. Reading and writing data can be significantly slower than working with a normal filesystem.
 1. Some directory structures may be very inefficient to scan during query split calculation.
-1. The output of work may not be immediately visible to a follow-on query.
 1. The rename-based algorithm by which Spark normally commits work when saving an RDD, DataFrame or Dataset
  is potentially both slow and unreliable.
 
@@ -66,8 +64,28 @@ For these reasons, it is not always safe to use an object store as a direct dest
 an intermediate store in a chain of queries. Consult the documentation of the object store and its
 connector to determine which uses are considered safe.
 
-In particular: *without some form of consistency layer, Amazon S3 cannot
-be safely used as the direct destination of work with the normal rename-based committer.*
+### Consistency
+
+As of 2021, the object stores of Amazon (S3), Google Cloud (GCS) and Microsoft (Azure Storage, ADLS Gen1, ADLS Gen2) are all *consistent*.
+
+This means that as soon as a file is written/updated it can be listed, viewed and opened by other processes
+-and the latest version will be retrieved. This was a known issue with AWS S3, especially with 404 caching
+of HEAD requests made before an object was created.
+
+Even so: none of the store connectors provide any guarantees as to how their clients cope with objects
+which are overwritten while a stream is reading them. Do not assume that the old file can be safely
+read, nor that there is any bounded time period for changes to become visible -or indeed, that
+the clients will not simply fail if a file being read is overwritten.
+
+For this reason: avoid overwriting files where it is known/likely that other clients
+will be actively reading them.
+
+Other object stores are *inconsistent*
+
+This includes [OpenStack Swift](https://docs.openstack.org/swift/latest/).
+
+Such stores are not always safe to use as a destination of work -consult
+each store's specific documentation. 
 
 ### Installation
 
@@ -163,10 +181,15 @@ different stores and connectors when renaming directories:
 | Amazon S3     | s3a       | Unsafe                  | O(data) |
 | Azure Storage | wasb      | Safe                    | O(files) |
 | Azure Datalake Gen 2 | abfs | Safe                  | O(1) |
-| Google Cloud Storage | gs        | Safe                    | O(1) |
+| Google Cloud Storage | gs        | Mixed                    | O(files) |
 
-As storing temporary files can run up charges; delete
+1. As storing temporary files can run up charges; delete
 directories called `"_temporary"` on a regular basis.
+1. For AWS S3, set a limit on how long multipart uploads can remain outstanding.
+This avoids incurring bills from incompleted uploads.
+1. For Google cloud, directory rename is file-by-file. Consider using the v2 committer
+and only write code which generates idemportent output -including filenames,
+as it is *no more unsafe* than the v1 committer, and faster.
 
 ### Parquet I/O Settings
 
@@ -245,6 +268,9 @@ mydataframe.write.format("parquet").save("s3a://bucket/destination")
 
 More details on these committers can be found in the latest Hadoop documentation.
 
+Note: depending upon the committer used, in-progress statistics may be
+under-reported with Hadoop versions before 3.3.1.
+
 ## Further Reading
 
 Here is the documentation on the standard connectors both from Apache and the cloud providers.
@@ -252,10 +278,13 @@ Here is the documentation on the standard connectors both from Apache and the cl
 * [OpenStack Swift](https://hadoop.apache.org/docs/current/hadoop-openstack/index.html).
 * [Azure Blob Storage and Azure Datalake Gen 2](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html).
 * [Azure Data Lake Gen 1](https://hadoop.apache.org/docs/current/hadoop-azure-datalake/index.html).
+* [Amazon S3 Strong Consistency](https://aws.amazon.com/s3/consistency/)
 * [Hadoop-AWS module (Hadoop 3.x)](https://hadoop.apache.org/docs/current3/hadoop-aws/tools/hadoop-aws/index.html).
 * [Amazon S3 via S3A and S3N (Hadoop 2.x)](https://hadoop.apache.org/docs/current2/hadoop-aws/tools/hadoop-aws/index.html).
 * [Amazon EMR File System (EMRFS)](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-fs.html). From Amazon.
+* [Using the EMRFS S3-optimized Committer](https://docs.amazonaws.cn/en_us/emr/latest/ReleaseGuide/emr-spark-s3-optimized-committer.html)
 * [Google Cloud Storage Connector for Spark and Hadoop](https://cloud.google.com/dataproc/docs/concepts/connectors/cloud-storage). From Google.
 * [The Azure Blob Filesystem driver (ABFS)](https://docs.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-abfs-driver)
-* IBM Cloud Object Storage connector for Apache Spark: [Stocator](https://github.com/CODAIT/stocator), [IBM Object Storage](https://www.ibm.com/cloud/object-storage). From IBM.
+* IBM Cloud Object Storage connector for Apache Spark: [Stocator](https://github.com/CODAIT/stocator),
+  [IBM Object Storage](https://www.ibm.com/cloud/object-storage). From IBM.
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
index 6babbb4..b6b07de 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
@@ -18,11 +18,12 @@
 package org.apache.spark.sql.execution.datasources
 
 import java.io.FileNotFoundException
+import java.nio.charset.StandardCharsets
 
 import scala.collection.mutable
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SparkContext, TaskContext}
 import org.apache.spark.internal.Logging
@@ -66,14 +67,66 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration)
   private def getFileSize(filePath: String): Option[Long] = {
     val path = new Path(filePath)
     val fs = path.getFileSystem(hadoopConf)
+    getFileSize(fs, path)
+  }
+
+  /**
+   * Get the size of the file expected to have been written by a worker.
+   * This supports the XAttr in HADOOP-17414 when the "magic committer" adds
+   * a custom HTTP header to the a zero byte marker.
+   * If the output file as returned by getFileStatus > 0 then the length if
+   * returned. For zero-byte files, the (optional) Hadoop FS API getXAttr() is
+   * invoked. If a parseable, non-negative length can be retrieved, this
+   * is returned instead of the length.
+   * @return the file size or None if the file was not found.
+   */
+  private [datasources] def getFileSize(fs: FileSystem, path: Path): Option[Long] = {
+    // the normal file status probe.
     try {
-      Some(fs.getFileStatus(path).getLen())
+      val len = fs.getFileStatus(path).getLen
+      if (len > 0) {
+        return Some(len)
+      }
     } catch {
       case e: FileNotFoundException =>
-        // may arise against eventually consistent object stores
+        // may arise against eventually consistent object stores.
         logDebug(s"File $path is not yet visible", e)
-        None
+        return None
+    }
+
+    // Output File Size is 0. Look to see if it has an attribute
+    // declaring a future-file-length.
+    // Failure of API call, parsing, invalid value all return the
+    // 0 byte length.
+
+    var len = 0L
+    try {
+      val attr = fs.getXAttr(path, BasicWriteJobStatsTracker.FILE_LENGTH_XATTR)
+      if (attr != null && attr.nonEmpty) {
+        val str = new String(attr, StandardCharsets.UTF_8)
+        logDebug(s"File Length statistics for $path retrieved from XAttr: $str")
+        // a non-empty header was found. parse to a long via the java class
+        val l = java.lang.Long.parseLong(str)
+        if (l > 0) {
+          len = l
+        } else {
+          logDebug("Ignoring negative value in XAttr file length")
+        }
+      }
+    } catch {
+      case e: NumberFormatException =>
+        // warn but don't dump the whole stack
+        logInfo(s"Failed to parse" +
+          s" ${BasicWriteJobStatsTracker.FILE_LENGTH_XATTR}:$e;" +
+          s" bytes written may be under-reported");
+      case e: UnsupportedOperationException =>
+        // this is not unusual; ignore
+        logDebug(s"XAttr not supported on path $path", e);
+      case e: Exception =>
+        // Something else. Log at debug and continue.
+        logDebug(s"XAttr processing failure on $path", e);
     }
+    Some(len)
   }
 
 
@@ -170,6 +223,8 @@ object BasicWriteJobStatsTracker {
   private val NUM_OUTPUT_BYTES_KEY = "numOutputBytes"
   private val NUM_OUTPUT_ROWS_KEY = "numOutputRows"
   private val NUM_PARTS_KEY = "numParts"
+  /** XAttr key of the data length header added in HADOOP-17414. */
+  val FILE_LENGTH_XATTR = "header.x-hadoop-s3a-magic-data-length"
 
   def metrics: Map[String, SQLMetric] = {
     val sparkContext = SparkContext.getActive.get
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala
index 32941d8..d93dc5f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala
@@ -17,12 +17,13 @@
 
 package org.apache.spark.sql.execution.datasources
 
-import java.nio.charset.Charset
+import java.nio.charset.{Charset, StandardCharsets}
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileSystem, FilterFileSystem, Path}
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker.FILE_LENGTH_XATTR
 import org.apache.spark.util.Utils
 
 /**
@@ -221,4 +222,127 @@ class BasicWriteTaskStatsTrackerSuite extends SparkFunSuite {
     write(file, data2)
   }
 
+  /**
+   * Does a length specified in the XAttr header get picked up?
+   */
+  test("XAttr sourced length") {
+    val file = new Path(tempDirPath, "file")
+    touch(file)
+    val xattrFS = new FsWithFakeXAttrs(localfs)
+    val bigLong = 34359738368L
+    xattrFS.set(FILE_LENGTH_XATTR, s"$bigLong")
+    val tracker = new BasicWriteTaskStatsTracker(conf)
+    assert(Some(bigLong) === tracker.getFileSize(xattrFS, file),
+      "Size not collected from XAttr entry")
+  }
+
+  /**
+   * If a file is non-empty then the XAttr size declaration
+   * is not used.
+   */
+  test("XAttr sourced length only used for 0-byte-files") {
+    val file = new Path(tempDirPath, "file")
+    write2(file)
+    val xattrFS = new FsWithFakeXAttrs(localfs)
+    val bigLong = 34359738368L
+    xattrFS.set(FILE_LENGTH_XATTR, s"$bigLong")
+    val tracker = new BasicWriteTaskStatsTracker(conf)
+    assert(Some(len2) === tracker.getFileSize(xattrFS, file),
+      "Size not collected from XAttr entry")
+  }
+
+  /**
+   * Any FS which supports XAttr must raise an FNFE if the
+   * file is missing. This verifies resilience on a path
+   * which the the local FS would not normally take.
+   */
+  test("Missing File with XAttr") {
+    val missing = new Path(tempDirPath, "missing")
+    val xattrFS = new FsWithFakeXAttrs(localfs)
+    val tracker = new BasicWriteTaskStatsTracker(conf)
+    tracker.newFile(missing.toString)
+    assert(None === tracker.getFileSize(xattrFS, missing))
+  }
+
+  /**
+   * If there are any problems parsing/validating the
+   * header attribute, fall back to the file length.
+   */
+  test("XAttr error recovery") {
+    val file = new Path(tempDirPath, "file")
+    touch(file)
+    val xattrFS = new FsWithFakeXAttrs(localfs)
+
+    val tracker = new BasicWriteTaskStatsTracker(conf)
+
+    // without a header
+    assert(Some(0) === tracker.getFileSize(xattrFS, file))
+
+    // will fail to parse as a long
+    xattrFS.set(FILE_LENGTH_XATTR, "Not-a-long")
+    assert(Some(0) === tracker.getFileSize(xattrFS, file))
+
+    // a negative value
+    xattrFS.set(FILE_LENGTH_XATTR, "-1")
+    assert(Some(0) === tracker.getFileSize(xattrFS, file))
+
+    // empty string
+    xattrFS.set(FILE_LENGTH_XATTR, "")
+    assert(Some(0) === tracker.getFileSize(xattrFS, file))
+
+    // then a zero byte array
+    xattrFS.setXAttr(file, FILE_LENGTH_XATTR,
+      new Array[Byte](0))
+    assert(Some(0) === tracker.getFileSize(xattrFS, file))
+  }
+
+  /**
+   * Extend any FS with a mock get/setXAttr.
+   * A map of attributes is used, these are returned on a getXAttr(path, key)
+   * call to any path; the other XAttr list/get calls are not implemented.
+   */
+  class FsWithFakeXAttrs(fs: FileSystem) extends FilterFileSystem(fs) {
+
+    private val xattrs = scala.collection.mutable.Map[String, Array[Byte]]()
+
+    /**
+     * Mock implementation of setAttr.
+     *
+     * @param path path (ignored)
+     * @param name attribute name.
+     * @param value byte array value
+     */
+    override def setXAttr(
+      path: Path,
+      name: String,
+      value: Array[Byte]): Unit = {
+
+      xattrs.put(name, value)
+    }
+
+    /**
+     * Set an attribute to the UTF-8 byte value of a string.
+     *
+     * @param name  attribute name.
+     * @param value string value
+     */
+    def set(name: String, value: String): Unit = {
+      setXAttr(null, name, value.getBytes(StandardCharsets.UTF_8))
+    }
+
+    /**
+     * Get any attribute if it is found in the map, else null.
+     * @param path path (ignored)
+     * @param name attribute name.
+     * @return the byte[] value or null.
+     */
+    override def getXAttr(
+      path: Path,
+      name: String): Array[Byte] = {
+      // force a check for the file and raise an FNFE if not found
+      getFileStatus(path)
+
+      xattrs.getOrElse(name, null)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org