You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/11 01:26:14 UTC

[20/20] git commit: Merge pull request #383 from tdas/driver-test

Merge pull request #383 from tdas/driver-test

API for automatic driver recovery for streaming programs and other bug fixes

1. Added Scala and Java API for automatically loading checkpoint if it exists in the provided checkpoint directory.

  Scala API: `StreamingContext.getOrCreate(<checkpoint dir>, <function to create new StreamingContext>)` returns a StreamingContext
  Java API: `JavaStreamingContext.getOrCreate(<checkpoint dir>, <factory obj of type JavaStreamingContextFactory>)`, return a JavaStreamingContext

  See the RecoverableNetworkWordCount below as an example of how to use it.

2. Refactored streaming.Checkpoint*** code to fix bugs and make the DStream metadata checkpoint writing and reading more robust. Specifically, it fixes and improves the logic behind backing up and writing metadata checkpoint files. Also, it ensure that spark.driver.* and spark.hostPort is cleared from SparkConf before being written to checkpoint.

3. Fixed bug in cleaning up of checkpointed RDDs created by DStream. Specifically, this fix ensures that checkpointed RDD's files are not prematurely cleaned up, thus ensuring reliable recovery.

4. TimeStampedHashMap is upgraded to optionally update the timestamp on map.get(key). This allows clearing of data based on access time (i.e., clear records were last accessed before a threshold timestamp).

5. Added caching for file modification time in FileInputDStream using the updated TimeStampedHashMap. Without the caching, enumerating the mod times to find new files can take seconds if there are 1000s of files. This cache is automatically cleared.

This PR is not entirely final as I may make some minor additions - a Java examples, and adding StreamingContext.getOrCreate to unit test.

Edit: Java example to be added later, unit test added.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/f2655310
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/f2655310
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/f2655310

Branch: refs/heads/master
Commit: f26553102c1995acf2a2ba6b502de4f2dbbd73b3
Parents: d37408f 4f39e79
Author: Patrick Wendell <pw...@gmail.com>
Authored: Fri Jan 10 16:25:44 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Fri Jan 10 16:25:44 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |   2 +-
 .../apache/spark/util/TimeStampedHashMap.scala  |  17 +-
 .../examples/JavaNetworkWordCount.java          |   7 +-
 .../streaming/examples/NetworkWordCount.scala   |   5 +-
 .../examples/RecoverableNetworkWordCount.scala  | 118 ++++++++++++
 .../org/apache/spark/streaming/Checkpoint.scala | 188 +++++++++++++------
 .../org/apache/spark/streaming/DStream.scala    |  15 +-
 .../spark/streaming/DStreamCheckpointData.scala | 106 ++++++-----
 .../apache/spark/streaming/DStreamGraph.scala   |  38 ++--
 .../spark/streaming/StreamingContext.scala      |  75 ++++++--
 .../api/java/JavaStreamingContext.scala         |  96 +++++++++-
 .../streaming/dstream/FileInputDStream.scala    |  42 +++--
 .../streaming/scheduler/JobGenerator.scala      |  31 ++-
 .../streaming/util/MasterFailureTest.scala      |  55 +++---
 .../spark/streaming/CheckpointSuite.scala       |  10 +-
 15 files changed, 600 insertions(+), 205 deletions(-)
----------------------------------------------------------------------