You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/11/15 23:44:18 UTC

spark git commit: [SPARK-18423][STREAMING] ReceiverTracker should close checkpoint dir when stopped even if it was not started

Repository: spark
Updated Branches:
  refs/heads/master 1ae4652b7 -> 503378f10


[SPARK-18423][STREAMING] ReceiverTracker should close checkpoint dir when stopped even if it was not started

## What changes were proposed in this pull request?

Several tests are being failed on Windows due to the failure of removing the checkpoint dir between each tests.

This is caused by not closed file in `ReceiverTracker`. When it is not started, it does not close it even if `stop()` is called.

```
Test org.apache.spark.streaming.JavaAPISuite.testCheckpointMasterRecovery started
Test org.apache.spark.streaming.JavaAPISuite.testCheckpointMasterRecovery failed: java.io.IOException: Failed to delete: C:\projects\spark\target\tmp\1478983663710-0, took 3.828 sec
    at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1010)
    at org.apache.spark.util.Utils.deleteRecursively(Utils.scala)
    at org.apache.spark.streaming.JavaAPISuite.testCheckpointMasterRecovery(JavaAPISuite.java:1809)
    ...
```

```
- mapWithState - basic operations with simple API (7 seconds, 640 milliseconds)
Exception encountered when attempting to run a suite with class name: org.apache.spark.streaming.MapWithStateSuite *** ABORTED *** (12 seconds, 688 milliseconds)
  java.io.IOException: Failed to delete: C:\projects\spark\streaming\checkpoint\spark-b8486e2b-6468-4e6f-bb24-88277d2c033c
  ...
```

## How was this patch tested?

Tests in `JavaAPISuite` and `MapWithStateSuite`.

Manually tested via AppVeyor:

**Before**

- `org.apache.spark.streaming.JavaAPISuite`
  Build: https://ci.appveyor.com/project/spark-test/spark/build/71-MapWithStateSuite-1
  Diff: https://github.com/apache/spark/compare/master...spark-test:188c828e682ec45b75d15c3dfc782bcdc8ce024c

- `org.apache.spark.streaming.MapWithStateSuite`
  Build: https://ci.appveyor.com/project/spark-test/spark/build/72-MapWithStateSuite-1
  Diff: https://github.com/apache/spark/compare/master...spark-test:8f6945d0ccde022a23d3848f6b7fe6da1e7c902e

**After**

- `org.apache.spark.streaming.JavaAPISuite`
  Build started: [Streaming] `org.apache.spark.streaming.JavaAPISuite` [![PR-15867](https://ci.appveyor.com/api/projects/status/github/spark-test/spark?branch=3D74F2D5-B0D5-4E1D-874C-685AE694FD37&svg=true)](https://ci.appveyor.com/project/spark-test/spark/branch/3D74F2D5-B0D5-4E1D-874C-685AE694FD37)
  Diff: https://github.com/apache/spark/compare/master...spark-test:3D74F2D5-B0D5-4E1D-874C-685AE694FD37

- `org.apache.spark.streaming.MapWithStateSuite`
  Build started: [Streaming] `org.apache.spark.streaming.MapWithStateSuite` [![PR-15867](https://ci.appveyor.com/api/projects/status/github/spark-test/spark?branch=C8E88B64-49F0-4157-9AFA-FC3ACC442351&svg=true)](https://ci.appveyor.com/project/spark-test/spark/branch/C8E88B64-49F0-4157-9AFA-FC3ACC442351)
  Diff: https://github.com/apache/spark/compare/master...spark-test:C8E88B64-49F0-4157-9AFA-FC3ACC442351

Author: hyukjinkwon <gu...@gmail.com>

Closes #15867 from HyukjinKwon/SPARK-18423.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/503378f1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/503378f1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/503378f1

Branch: refs/heads/master
Commit: 503378f10ca92064034aa88e0feebe4718af8bbe
Parents: 1ae4652
Author: hyukjinkwon <gu...@gmail.com>
Authored: Tue Nov 15 15:44:15 2016 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Tue Nov 15 15:44:15 2016 -0800

----------------------------------------------------------------------
 .../spark/streaming/scheduler/ReceiverTracker.scala       | 10 ++++++++++
 1 file changed, 10 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/503378f1/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index b9d898a..8f55d98 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -197,6 +197,13 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
       receivedBlockTracker.stop()
       logInfo("ReceiverTracker stopped")
       trackerState = Stopped
+    } else if (isTrackerInitialized) {
+      trackerState = Stopping
+      // `ReceivedBlockTracker` is open when this instance is created. We should
+      // close this even if this `ReceiverTracker` is not started.
+      receivedBlockTracker.stop()
+      logInfo("ReceiverTracker stopped")
+      trackerState = Stopped
     }
   }
 
@@ -446,6 +453,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
     endpoint.send(StartAllReceivers(receivers))
   }
 
+  /** Check if tracker has been marked for initiated */
+  private def isTrackerInitialized: Boolean = trackerState == Initialized
+
   /** Check if tracker has been marked for starting */
   private def isTrackerStarted: Boolean = trackerState == Started
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org