You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/11 01:25:58 UTC

[04/20] git commit: Merge branch 'filestream-fix' into driver-test

Merge branch 'filestream-fix' into driver-test

Conflicts:
	streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/23947945
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/23947945
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/23947945

Branch: refs/heads/master
Commit: 23947945913cafb4f6549167c53a3cdd4a09fef0
Parents: 8e88db3 fcd17a1
Author: Tathagata Das <ta...@gmail.com>
Authored: Mon Jan 6 02:23:53 2014 +0000
Committer: Tathagata Das <ta...@gmail.com>
Committed: Mon Jan 6 02:23:53 2014 +0000

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  23 +--
 .../spark/api/java/JavaSparkContext.scala       |  15 +-
 .../org/apache/spark/rdd/CheckpointRDD.scala    |  32 ++--
 .../apache/spark/rdd/RDDCheckpointData.scala    |  15 +-
 .../scala/org/apache/spark/JavaAPISuite.java    |   4 +-
 python/pyspark/context.py                       |   9 +-
 python/pyspark/tests.py                         |   4 +-
 .../org/apache/spark/streaming/Checkpoint.scala |  50 +++---
 .../spark/streaming/StreamingContext.scala      |  32 ++--
 .../api/java/JavaStreamingContext.scala         |  13 +-
 .../streaming/dstream/FileInputDStream.scala    | 153 +++++++++++--------
 .../streaming/scheduler/JobGenerator.scala      |  67 +++++---
 .../spark/streaming/CheckpointSuite.scala       |  44 +++---
 .../spark/streaming/InputStreamsSuite.scala     |   2 +-
 14 files changed, 267 insertions(+), 196 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/23947945/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/23947945/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
diff --cc streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 139e2c0,4960a85..09b184b
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@@ -21,12 -21,13 +21,13 @@@ import java.io.
  import java.util.concurrent.Executors
  import java.util.concurrent.RejectedExecutionException
  
- import org.apache.hadoop.fs.Path
+ import org.apache.hadoop.fs.{FileSystem, Path}
  import org.apache.hadoop.conf.Configuration
  
 -import org.apache.spark.Logging
 +import org.apache.spark.{SparkException, Logging}
  import org.apache.spark.io.CompressionCodec
  import org.apache.spark.util.MetadataCleaner
+ import org.apache.spark.deploy.SparkHadoopUtil
  
  
  private[streaming]
@@@ -141,16 -151,11 +151,15 @@@ class CheckpointWriter(checkpointDir: S
  private[streaming]
  object CheckpointReader extends Logging {
  
 -  def read(path: String): Checkpoint = {
 +  def doesCheckpointExist(path: String): Boolean = {
 +    val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"))
      val fs = new Path(path).getFileSystem(new Configuration())
 -    val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"),
 -      new Path(path), new Path(path + ".bk"))
 +    (attempts.count(p => fs.exists(p)) > 1)
 +  }
  
 +  def read(path: String): Checkpoint = {
 +    val fs = new Path(path).getFileSystem(new Configuration())
 +    val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"))
- 
      val compressionCodec = CompressionCodec.createCodec()
  
      attempts.foreach(file => {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/23947945/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/23947945/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/23947945/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
----------------------------------------------------------------------