You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/15 22:56:17 UTC

[1/2] git commit: Merge pull request #443 from tdas/filestream-fix

Updated Branches:
  refs/heads/branch-0.9 aca40aae8 -> e3fa36f25


Merge pull request #443 from tdas/filestream-fix

Made some classes private[stremaing] and deprecated a method in JavaStreamingContext.

Classes `RawTextHelper`, `RawTextSender` and `RateLimitedOutputStream` are not useful in the streaming API. There are not used by the core functionality and was there as a support classes for an obscure example. One of the classes is RawTextSender has a main function which can be executed using bin/spark-class even if it is made private[streaming]. In future, I will probably completely remove these classes. For the time being, I am just converting them to private[streaming].

Accessing underlying JavaSparkContext in JavaStreamingContext was through `JavaStreamingContext.sc` . This is deprecated and preferred method is `JavaStreamingContext.sparkContext` to keep it consistent with the `StreamingContext.sparkContext`.
(cherry picked from commit 2a05403a7ced4ecf6084c96f582ee3a24f3cc874)

Signed-off-by: Patrick Wendell <pw...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/29c76d96
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/29c76d96
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/29c76d96

Branch: refs/heads/branch-0.9
Commit: 29c76d96b2489823a7ad4781129b707c73108bf8
Parents: aca40aa
Author: Patrick Wendell <pw...@gmail.com>
Authored: Wed Jan 15 13:54:45 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Wed Jan 15 13:55:48 2014 -0800

----------------------------------------------------------------------
 .../apache/spark/streaming/api/java/JavaStreamingContext.scala | 6 +++++-
 .../apache/spark/streaming/util/RateLimitedOutputStream.scala  | 1 +
 .../scala/org/apache/spark/streaming/util/RawTextHelper.scala  | 1 +
 .../scala/org/apache/spark/streaming/util/RawTextSender.scala  | 1 +
 .../src/test/java/org/apache/spark/streaming/JavaAPISuite.java | 6 +++---
 5 files changed, 11 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29c76d96/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 4edf8fa..613683c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -141,8 +141,12 @@ class JavaStreamingContext(val ssc: StreamingContext) {
    */
   def this(path: String, hadoopConf: Configuration) = this(new StreamingContext(path, hadoopConf))
 
+
+  @deprecated("use sparkContext", "0.9.0")
+  val sc: JavaSparkContext = sparkContext
+
   /** The underlying SparkContext */
-  val sc: JavaSparkContext = new JavaSparkContext(ssc.sc)
+  val sparkContext = new JavaSparkContext(ssc.sc)
 
   /**
    * Create a input stream from network source hostname:port. Data is received using

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29c76d96/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala
index b9c0596..179fd75 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala
@@ -22,6 +22,7 @@ import scala.annotation.tailrec
 import java.io.OutputStream
 import java.util.concurrent.TimeUnit._
 
+private[streaming]
 class RateLimitedOutputStream(out: OutputStream, bytesPerSec: Int) extends OutputStream {
   val SYNC_INTERVAL = NANOSECONDS.convert(10, SECONDS)
   val CHUNK_SIZE = 8192

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29c76d96/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
index 5b6c048..07021eb 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
@@ -22,6 +22,7 @@ import org.apache.spark.SparkContext._
 import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
 import scala.collection.JavaConversions.mapAsScalaMap
 
+private[streaming]
 object RawTextHelper {
 
   /** 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29c76d96/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
index 463617a..684b38e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
@@ -33,6 +33,7 @@ import org.apache.spark.util.IntParam
  * A helper program that sends blocks of Kryo-serialized text strings out on a socket at a
  * specified rate. Used to feed data into RawInputDStream.
  */
+private[streaming]
 object RawTextSender extends Logging {
   def main(args: Array[String]) {
     if (args.length != 4) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29c76d96/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 8b7d770..4fbbce9 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -297,9 +297,9 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
         Arrays.asList(7,8,9));
 
     JavaSparkContext jsc = new JavaSparkContext(ssc.ssc().sc());
-    JavaRDD<Integer> rdd1 = ssc.sc().parallelize(Arrays.asList(1, 2, 3));
-    JavaRDD<Integer> rdd2 = ssc.sc().parallelize(Arrays.asList(4, 5, 6));
-    JavaRDD<Integer> rdd3 = ssc.sc().parallelize(Arrays.asList(7,8,9));
+    JavaRDD<Integer> rdd1 = ssc.sparkContext().parallelize(Arrays.asList(1, 2, 3));
+    JavaRDD<Integer> rdd2 = ssc.sparkContext().parallelize(Arrays.asList(4, 5, 6));
+    JavaRDD<Integer> rdd3 = ssc.sparkContext().parallelize(Arrays.asList(7,8,9));
 
     LinkedList<JavaRDD<Integer>> rdds = Lists.newLinkedList();
     rdds.add(rdd1);


[2/2] git commit: Merge pull request #442 from pwendell/standalone

Posted by pw...@apache.org.
Merge pull request #442 from pwendell/standalone

Workers should use working directory as spark home if it's not specified

If users don't set SPARK_HOME in their environment file when launching an application, the standalone cluster should default to the spark home of the worker.
(cherry picked from commit 59f475c79fc8fd6d3485e4d0adf6768b6a9225a4)

Signed-off-by: Patrick Wendell <pw...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/e3fa36f2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/e3fa36f2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/e3fa36f2

Branch: refs/heads/branch-0.9
Commit: e3fa36f259b7ede73bc148891e2635bf41221660
Parents: 29c76d9
Author: Patrick Wendell <pw...@gmail.com>
Authored: Wed Jan 15 13:55:14 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Wed Jan 15 13:56:04 2014 -0800

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e3fa36f2/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 5182dcb..312560d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -209,8 +209,11 @@ private[spark] class Worker(
         logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
       } else {
         logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
+        // TODO (pwendell): We shuld make sparkHome an Option[String] in
+        // ApplicationDescription to be more explicit about this.
+        val effectiveSparkHome = Option(execSparkHome_).getOrElse(sparkHome.getAbsolutePath)
         val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
-          self, workerId, host, new File(execSparkHome_), workDir, akkaUrl, ExecutorState.RUNNING)
+          self, workerId, host, new File(effectiveSparkHome), workDir, akkaUrl, ExecutorState.RUNNING)
         executors(appId + "/" + execId) = manager
         manager.start()
         coresUsed += cores_