You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/11 01:25:58 UTC
[04/20] git commit: Merge branch 'filestream-fix' into driver-test
Merge branch 'filestream-fix' into driver-test
Conflicts:
streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/23947945
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/23947945
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/23947945
Branch: refs/heads/master
Commit: 23947945913cafb4f6549167c53a3cdd4a09fef0
Parents: 8e88db3 fcd17a1
Author: Tathagata Das <ta...@gmail.com>
Authored: Mon Jan 6 02:23:53 2014 +0000
Committer: Tathagata Das <ta...@gmail.com>
Committed: Mon Jan 6 02:23:53 2014 +0000
----------------------------------------------------------------------
.../scala/org/apache/spark/SparkContext.scala | 23 +--
.../spark/api/java/JavaSparkContext.scala | 15 +-
.../org/apache/spark/rdd/CheckpointRDD.scala | 32 ++--
.../apache/spark/rdd/RDDCheckpointData.scala | 15 +-
.../scala/org/apache/spark/JavaAPISuite.java | 4 +-
python/pyspark/context.py | 9 +-
python/pyspark/tests.py | 4 +-
.../org/apache/spark/streaming/Checkpoint.scala | 50 +++---
.../spark/streaming/StreamingContext.scala | 32 ++--
.../api/java/JavaStreamingContext.scala | 13 +-
.../streaming/dstream/FileInputDStream.scala | 153 +++++++++++--------
.../streaming/scheduler/JobGenerator.scala | 67 +++++---
.../spark/streaming/CheckpointSuite.scala | 44 +++---
.../spark/streaming/InputStreamsSuite.scala | 2 +-
14 files changed, 267 insertions(+), 196 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/23947945/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/23947945/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
diff --cc streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 139e2c0,4960a85..09b184b
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@@ -21,12 -21,13 +21,13 @@@ import java.io.
import java.util.concurrent.Executors
import java.util.concurrent.RejectedExecutionException
- import org.apache.hadoop.fs.Path
+ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration
-import org.apache.spark.Logging
+import org.apache.spark.{SparkException, Logging}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.util.MetadataCleaner
+ import org.apache.spark.deploy.SparkHadoopUtil
private[streaming]
@@@ -141,16 -151,11 +151,15 @@@ class CheckpointWriter(checkpointDir: S
private[streaming]
object CheckpointReader extends Logging {
- def read(path: String): Checkpoint = {
+ def doesCheckpointExist(path: String): Boolean = {
+ val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"))
val fs = new Path(path).getFileSystem(new Configuration())
- val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"),
- new Path(path), new Path(path + ".bk"))
+ (attempts.count(p => fs.exists(p)) > 1)
+ }
+ def read(path: String): Checkpoint = {
+ val fs = new Path(path).getFileSystem(new Configuration())
+ val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"))
-
val compressionCodec = CompressionCodec.createCodec()
attempts.foreach(file => {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/23947945/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/23947945/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/23947945/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
----------------------------------------------------------------------