You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/11/11 22:49:17 UTC

[spark] branch branch-3.0 updated: [SPARK-33402][CORE] Jobs launched in same second have duplicate MapReduce JobIDs

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 3edec10  [SPARK-33402][CORE] Jobs launched in same second have duplicate MapReduce JobIDs
3edec10 is described below

commit 3edec10891f6d4db0f3fe792b453cf384d7c3f40
Author: Steve Loughran <st...@cloudera.com>
AuthorDate: Wed Nov 11 14:27:48 2020 -0800

    [SPARK-33402][CORE] Jobs launched in same second have duplicate MapReduce JobIDs
    
    ### What changes were proposed in this pull request?
    
    1. Applies the SQL changes in SPARK-33230 to SparkHadoopWriter, so that `rdd.saveAsNewAPIHadoopDataset` passes in a unique job UUID in `spark.sql.sources.writeJobUUID`
    1. `SparkHadoopWriterUtils.createJobTrackerID` generates a JobID by appending a random long number to the supplied timestamp to ensure the probability of a collision is near-zero.
    1. With tests of uniqueness, round trips and negative jobID rejection.
    
    ### Why are the changes needed?
    
    Without this, if more than one job is started in the same second *and the committer expects application attempt IDs to be unique* is at risk of clashing with other jobs.
    
    With the fix,
    
    * those committers which use the ID set in `spark.sql.sources.writeJobUUID` as a priority ID will pick that up instead and so be unique.
    * committers which use the Hadoop JobID for unique paths and filenames will get the randomly generated jobID.  Assuming all clocks in a cluster in sync, the probability of two jobs launched in the same second has dropped from 1 to 1/(2^63)
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Unit tests
    
    There's a new test suite SparkHadoopWriterUtilsSuite which creates jobID, verifies they are unique even for the same timestamp and that they can be marshalled to string and parsed back in the hadoop code, which contains some (brittle) assumptions about the format of job IDs.
    
    Functional Integration Tests
    
    1. Hadoop-trunk built with [HADOOP-17318], publishing to local maven repository
    1. Spark built with hadoop.version=3.4.0-SNAPSHOT to pick up these JARs.
    1. Spark + Object store integration tests at [https://github.com/hortonworks-spark/cloud-integration](https://github.com/hortonworks-spark/cloud-integration) were built against that local spark version
    1. And executed against AWS london.
    
    The tests were run with `fs.s3a.committer.require.uuid=true`, so the s3a committers fail fast if they don't get a job ID down. This showed that `rdd.saveAsNewAPIHadoopDataset` wasn't setting the UUID option. It again uses the current Date value for an app attempt -which is not guaranteed to be unique.
    
    With the change applied to spark, the relevant tests work, therefore the committers are getting unique job IDs.
    
    Closes #30319 from steveloughran/BUG/SPARK-33402-jobuuid.
    
    Authored-by: Steve Loughran <st...@cloudera.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
    (cherry picked from commit 318a173fcee11902820593fe4ac992a90e6bb00e)
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../spark/internal/io/SparkHadoopWriter.scala      |   7 +-
 .../spark/internal/io/SparkHadoopWriterUtils.scala |  25 ++++-
 .../internal/io/SparkHadoopWriterUtilsSuite.scala  | 102 +++++++++++++++++++++
 3 files changed, 131 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala
index a619f10..a5d2c5c 100644
--- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.internal.io
 
 import java.text.NumberFormat
-import java.util.{Date, Locale}
+import java.util.{Date, Locale, UUID}
 
 import scala.reflect.ClassTag
 
@@ -70,6 +70,11 @@ object SparkHadoopWriter extends Logging {
     // Assert the output format/key/value class is set in JobConf.
     config.assertConf(jobContext, rdd.conf)
 
+    // propagate the description UUID into the jobs, so that committers
+    // get an ID guaranteed to be unique.
+    jobContext.getConfiguration.set("spark.sql.sources.writeJobUUID",
+      UUID.randomUUID.toString)
+
     val committer = config.createCommitter(commitJobId)
     committer.setupJob(jobContext)
 
diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriterUtils.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriterUtils.scala
index de828a6..657842c 100644
--- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriterUtils.scala
+++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriterUtils.scala
@@ -20,7 +20,7 @@ package org.apache.spark.internal.io
 import java.text.SimpleDateFormat
 import java.util.{Date, Locale}
 
-import scala.util.DynamicVariable
+import scala.util.{DynamicVariable, Random}
 
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapred.{JobConf, JobID}
@@ -37,14 +37,35 @@ private[spark]
 object SparkHadoopWriterUtils {
 
   private val RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES = 256
+  private val RAND = new Random()
 
+  /**
+   * Create a job ID.
+   *
+   * @param time (current) time
+   * @param id job number
+   * @return a job ID
+   */
   def createJobID(time: Date, id: Int): JobID = {
+    if (id < 0) {
+      throw new IllegalArgumentException("Job number is negative")
+    }
     val jobtrackerID = createJobTrackerID(time)
     new JobID(jobtrackerID, id)
   }
 
+  /**
+   * Generate an ID for a job tracker.
+   * @param time (current) time
+   * @return a string for a job ID
+   */
   def createJobTrackerID(time: Date): String = {
-    new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(time)
+    val base = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(time)
+    var l1 = RAND.nextLong()
+    if (l1 < 0) {
+      l1 = -l1
+    }
+    base + l1
   }
 
   def createPathFromString(path: String, conf: JobConf): Path = {
diff --git a/core/src/test/scala/org/apache/spark/internal/io/SparkHadoopWriterUtilsSuite.scala b/core/src/test/scala/org/apache/spark/internal/io/SparkHadoopWriterUtilsSuite.scala
new file mode 100644
index 0000000..33b58ec
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/internal/io/SparkHadoopWriterUtilsSuite.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.io
+
+import java.util.Date
+
+import org.apache.hadoop.mapreduce.JobID
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.internal.io.SparkHadoopWriterUtils.createJobID
+
+/**
+ * Unit tests for functions in SparkHadoopWriterUtils.
+ */
+class SparkHadoopWriterUtilsSuite extends SparkFunSuite {
+
+  /**
+   * Core test of JobID generation:
+   * They are created.
+   * The job number is converted to the job ID.
+   * They round trip to string and back
+   * (which implies that the full string matches the regexp
+   * in the JobID class).
+   */
+  test("JobID Generation") {
+    val jobNumber = 1010
+    val j1 = createJobID(new Date(), jobNumber)
+    assert(jobNumber == j1.getId,
+      s"Job number mismatch in $j1")
+
+    val jobStr = j1.toString
+    // the string value begins with job_
+    assert(jobStr.startsWith("job_"),
+      s"wrong prefix of $jobStr")
+    // and the hadoop code can parse it
+    val j2 = roundTrip(j1)
+    assert(j1.getId == j2.getId, "Job ID mismatch")
+    assert(j1.getJtIdentifier == j2.getJtIdentifier, "Job identifier mismatch")
+  }
+
+  /**
+   * This is the problem surfacing in situations where committers expect
+   * Job IDs to be unique: if the timestamp is (exclusively) used
+   * then there will conflict in directories created.
+   */
+  test("JobIDs generated at same time are different") {
+    val now = new Date()
+    val j1 = createJobID(now, 1)
+    val j2 = createJobID(now, 1)
+    assert(j1.toString != j2.toString)
+  }
+
+  /**
+   * There's nothing explicitly in the Hadoop classes to stop
+   * job numbers being negative.
+   * There's some big assumptions in the FileOutputCommitter about attempt IDs
+   * being positive during any recovery operations; for safety the ID
+   * job number is validated.
+   */
+  test("JobIDs with negative job number") {
+    intercept[IllegalArgumentException] {
+      createJobID(new Date(), -1)
+    }
+  }
+
+  /**
+   * If someone ever does reinstate use of timestamps,
+   * make sure that the case of timestamp == 0 is handled.
+   */
+  test("JobIDs on Epoch are different") {
+    val j1 = createJobID(new Date(0), 0)
+    val j2 = createJobID(new Date(0), 0)
+    assert (j1.toString != j2.toString)
+  }
+
+  /**
+   * Do a round trip as a string and back again.
+   * This uses the JobID parser.
+   * @param jobID job ID
+   * @return the returned jobID
+   */
+  private def roundTrip(jobID: JobID): JobID = {
+    val parsedJobId = JobID.forName(jobID.toString)
+    assert(jobID == parsedJobId, "Round trip was inconsistent")
+    parsedJobId
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org