You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2016/04/27 20:23:32 UTC

spark git commit: [SPARK-14930][SPARK-13693] Fix race condition in CheckpointWriter.stop()

Repository: spark
Updated Branches:
  refs/heads/master e4d439c83 -> 450136ec0


[SPARK-14930][SPARK-13693] Fix race condition in CheckpointWriter.stop()

CheckpointWriter.stop() is prone to a race condition: if one thread calls `stop()` right as a checkpoint write task begins to execute, that write task may become blocked when trying to access `fs`, the shared Hadoop FileSystem, since both the `fs` getter and `stop` method synchronize on the same lock. Here's a thread-dump excerpt which illustrates the problem:

```java
"pool-31-thread-1" #156 prio=5 os_prio=31 tid=0x00007fea02cd2000 nid=0x5c0b waiting for monitor entry [0x000000013bc4c000]
   java.lang.Thread.State: BLOCKED (on object monitor)
    at org.apache.spark.streaming.CheckpointWriter.org$apache$spark$streaming$CheckpointWriter$$fs(Checkpoint.scala:302)
    - waiting to lock <0x00000007bf53ee78> (a org.apache.spark.streaming.CheckpointWriter)
    at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:224)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

"pool-1-thread-1-ScalaTest-running-MapWithStateSuite" #11 prio=5 os_prio=31 tid=0x00007fe9ff879800 nid=0x5703 waiting on condition [0x000000012e54c000]
   java.lang.Thread.State: TIMED_WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000007bf564568> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
    at java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1465)
    at org.apache.spark.streaming.CheckpointWriter.stop(Checkpoint.scala:291)
    - locked <0x00000007bf53ee78> (a org.apache.spark.streaming.CheckpointWriter)
    at org.apache.spark.streaming.scheduler.JobGenerator.stop(JobGenerator.scala:159)
    - locked <0x00000007bf53ea90> (a org.apache.spark.streaming.scheduler.JobGenerator)
    at org.apache.spark.streaming.scheduler.JobScheduler.stop(JobScheduler.scala:115)
    - locked <0x00000007bf53d3f0> (a org.apache.spark.streaming.scheduler.JobScheduler)
    at org.apache.spark.streaming.StreamingContext$$anonfun$stop$1.apply$mcV$sp(StreamingContext.scala:680)
    at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1219)
    at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:679)
    - locked <0x00000007bf516a70> (a org.apache.spark.streaming.StreamingContext)
    at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:644)
    - locked <0x00000007bf516a70> (a org.apache.spark.streaming.StreamingContext)
[...]
```

We can fix this problem by having `stop` and `fs` be synchronized on different locks: the synchronization on `stop` only needs to guard against multiple threads calling `stop` at the same time, whereas the synchronization on `fs` is only necessary for cross-thread visibility. There's only ever a single active checkpoint writer thread at a time, so we don't need to guard against concurrent access to `fs`. Thus, `fs` can simply become a `volatile` var, similar to `lastCheckpointTime`.

This change should fix [SPARK-13693](https://issues.apache.org/jira/browse/SPARK-13693), a flaky `MapWithStateSuite` test suite which has recently been failing several times per day. It also results in a huge test speedup: prior to this patch, `MapWithStateSuite` took about 80 seconds to run, whereas it now runs in less than 10 seconds. For the `streaming` project's tests as a whole, they now run in ~220 seconds vs. ~354 before.

/cc zsxwing and tdas for review.

Author: Josh Rosen <jo...@databricks.com>

Closes #12712 from JoshRosen/fix-checkpoint-writer-race.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/450136ec
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/450136ec
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/450136ec

Branch: refs/heads/master
Commit: 450136ec0dab16a12e514c842f9062a6979ee9aa
Parents: e4d439c
Author: Josh Rosen <jo...@databricks.com>
Authored: Wed Apr 27 11:23:26 2016 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Wed Apr 27 11:23:26 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/streaming/Checkpoint.scala    | 17 +++++------------
 1 file changed, 5 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/450136ec/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 0395600..7d8b867 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -184,8 +184,7 @@ class CheckpointWriter(
   val executor = Executors.newFixedThreadPool(1)
   val compressionCodec = CompressionCodec.createCodec(conf)
   private var stopped = false
-  private var _fs: FileSystem = _
-
+  @volatile private[this] var fs: FileSystem = null
   @volatile private var latestCheckpointTime: Time = null
 
   class CheckpointWriteHandler(
@@ -196,6 +195,9 @@ class CheckpointWriter(
       if (latestCheckpointTime == null || latestCheckpointTime < checkpointTime) {
         latestCheckpointTime = checkpointTime
       }
+      if (fs == null) {
+        fs = new Path(checkpointDir).getFileSystem(hadoopConf)
+      }
       var attempts = 0
       val startTime = System.currentTimeMillis()
       val tempFile = new Path(checkpointDir, "temp")
@@ -263,7 +265,7 @@ class CheckpointWriter(
           case ioe: IOException =>
             logWarning("Error in attempt " + attempts + " of writing checkpoint to "
               + checkpointFile, ioe)
-            reset()
+            fs = null
         }
       }
       logWarning("Could not write checkpoint for time " + checkpointTime + " to file "
@@ -297,15 +299,6 @@ class CheckpointWriter(
       ", waited for " + (endTime - startTime) + " ms.")
     stopped = true
   }
-
-  private def fs = synchronized {
-    if (_fs == null) _fs = new Path(checkpointDir).getFileSystem(hadoopConf)
-    _fs
-  }
-
-  private def reset() = synchronized {
-    _fs = null
-  }
 }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org