You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/11 01:25:57 UTC

[03/20] git commit: Bug fixes to the DriverRunner and minor changes here and there.

Bug fixes to the DriverRunner and minor changes here and there.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/8e88db3c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/8e88db3c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/8e88db3c

Branch: refs/heads/master
Commit: 8e88db3ca56e6a56668b029e39c8e96b86d4dd5e
Parents: 3d44743
Author: Tathagata Das <ta...@gmail.com>
Authored: Mon Jan 6 02:21:56 2014 +0000
Committer: Tathagata Das <ta...@gmail.com>
Committed: Mon Jan 6 02:21:56 2014 +0000

----------------------------------------------------------------------
 conf/slaves                                                  | 7 +++++--
 .../scala/org/apache/spark/deploy/worker/DriverRunner.scala  | 8 ++++----
 .../apache/spark/streaming/dstream/FileInputDStream.scala    | 2 +-
 .../org/apache/spark/streaming/scheduler/JobGenerator.scala  | 8 ++++----
 4 files changed, 14 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8e88db3c/conf/slaves
----------------------------------------------------------------------
diff --git a/conf/slaves b/conf/slaves
index da0a013..30ea300 100644
--- a/conf/slaves
+++ b/conf/slaves
@@ -1,2 +1,5 @@
-# A Spark Worker will be started on each of the machines listed below.
-localhost
\ No newline at end of file
+ec2-54-221-59-252.compute-1.amazonaws.com
+ec2-67-202-26-243.compute-1.amazonaws.com
+ec2-23-22-220-97.compute-1.amazonaws.com
+ec2-50-16-98-100.compute-1.amazonaws.com
+ec2-54-234-164-206.compute-1.amazonaws.com

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8e88db3c/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index 7485b89..2d567b7 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -119,15 +119,15 @@ private[spark] class DriverRunner(
     val emptyConf = new Configuration() // TODO: In docs explain it needs to be full HDFS path
     val jarFileSystem = jarPath.getFileSystem(emptyConf)
 
-    val destPath = new Path(driverDir.getAbsolutePath())
-    val destFileSystem = destPath.getFileSystem(emptyConf)
+    val destPath = new File(driverDir.getAbsolutePath(), jarPath.getName())
+    // val destFileSystem = destPath.getFileSystem(emptyConf)
     val jarFileName = jarPath.getName
     val localJarFile = new File(driverDir, jarFileName)
     val localJarFilename = localJarFile.getAbsolutePath
 
     if (!localJarFile.exists()) { // May already exist if running multiple workers on one node
       logInfo(s"Copying user jar $jarPath to $destPath")
-      FileUtil.copy(jarFileSystem, jarPath, destFileSystem, destPath, false, false, emptyConf)
+      FileUtil.copy(jarFileSystem, jarPath, destPath, false, emptyConf)
     }
 
     if (!localJarFile.exists()) { // Verify copy succeeded
@@ -161,7 +161,7 @@ private[spark] class DriverRunner(
         val stderr = new File(baseDir, "stderr")
         val header = "Launch Command: %s\n%s\n\n".format(
           command.mkString("\"", "\" \"", "\""), "=" * 40)
-        Files.write(header, stderr, Charsets.UTF_8)
+        Files.append(header, stderr, Charsets.UTF_8)
         CommandUtils.redirectStream(process.get.getErrorStream, stderr)
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8e88db3c/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 39e2523..a5a5f2e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -175,7 +175,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
     override def cleanup() { }
 
     override def restore() {
-      hadoopFiles.foreach {
+      hadoopFiles.toSeq.sortBy(_._1)(Time.ordering).foreach {
         case (t, f) => {
           // Restore the metadata in both files and generatedRDDs
           logInfo("Restoring files for time " + t + " - " +

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8e88db3c/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 1cd0b9b..6c1df4f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -85,14 +85,14 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
     val checkpointTime = ssc.initialCheckpoint.checkpointTime
     val restartTime = new Time(timer.getRestartTime(graph.zeroTime.milliseconds))
     val downTimes = checkpointTime.until(restartTime, batchDuration)
-    logInfo("Batches during down time: " + downTimes.mkString(", "))
+    logInfo("Batches during down time (" + downTimes.size + " batches): " + downTimes.mkString(", "))
 
     // Batches that were unprocessed before failure
-    val pendingTimes = ssc.initialCheckpoint.pendingTimes
-    logInfo("Batches pending processing: " + pendingTimes.mkString(", "))
+    val pendingTimes = ssc.initialCheckpoint.pendingTimes.sorted(Time.ordering)
+    logInfo("Batches pending processing (" + pendingTimes.size + " batches): " + pendingTimes.mkString(", "))
     // Reschedule jobs for these times
     val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering)
-    logInfo("Batches to reschedule: " + timesToReschedule.mkString(", "))
+    logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " + timesToReschedule.mkString(", "))
     timesToReschedule.foreach(time =>
       jobScheduler.runJobs(time, graph.generateJobs(time))
     )