You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/05/05 01:21:44 UTC

[3/7] spark git commit: [SPARK-6943] [SPARK-6944] DAG visualization on SparkUI

http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js
----------------------------------------------------------------------
diff --git a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js
new file mode 100644
index 0000000..99b0294
--- /dev/null
+++ b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * This file contains the logic to render the RDD DAG visualization in the UI.
+ *
+ * This DAG describes the relationships between
+ *   (1) an RDD and its dependencies,
+ *   (2) an RDD and its operation scopes, and
+ *   (3) an RDD's operation scopes and the stage / job hierarchy
+ *
+ * An operation scope is a general, named code block representing an operation
+ * that instantiates RDDs (e.g. filter, textFile, reduceByKey). An operation
+ * scope can be nested inside of other scopes if the corresponding RDD operation
+ * invokes other such operations (for more detail, see o.a.s.rdd.operationScope).
+ *
+ * A stage may include one or more operation scopes if the RDD operations are
+ * streamlined into one stage (e.g. rdd.map(...).filter(...).flatMap(...)).
+ * On the flip side, an operation scope may also include one or many stages,
+ * or even jobs if the RDD operation is higher level than Spark's scheduling
+ * primitives (e.g. take, any SQL query).
+ *
+ * In the visualization, an RDD is expressed as a node, and its dependencies
+ * as directed edges (from parent to child). operation scopes, stages, and
+ * jobs are expressed as clusters that may contain one or many nodes. These
+ * clusters may be nested inside of each other in the scenarios described
+ * above.
+ *
+ * The visualization is rendered in an SVG contained in "div#dag-viz-graph",
+ * and its input data is expected to be populated in "div#dag-viz-metadata"
+ * by Spark's UI code. This is currently used only on the stage page and on
+ * the job page.
+ *
+ * This requires jQuery, d3, and dagre-d3. Note that we use a custom release
+ * of dagre-d3 (http://github.com/andrewor14/dagre-d3) for some specific
+ * functionality. For more detail, please track the changes in that project
+ * since it was forked (commit 101503833a8ce5fe369547f6addf3e71172ce10b).
+ */
+
+var VizConstants = {
+  rddColor: "#444444",
+  rddCachedColor: "#FF0000",
+  rddOperationColor: "#AADFFF",
+  stageColor: "#FFDDEE",
+  clusterLabelColor: "#888888",
+  edgeColor: "#444444",
+  edgeWidth: "1.5px",
+  svgMarginX: 0,
+  svgMarginY: 20,
+  stageSep: 50,
+  graphPrefix: "graph_",
+  nodePrefix: "node_",
+  stagePrefix: "stage_",
+  clusterPrefix: "cluster_",
+  stageClusterPrefix: "cluster_stage_"
+};
+
+// Helper d3 accessors for the elements that contain our graph and its metadata
+function graphContainer() { return d3.select("#dag-viz-graph"); }
+function metadataContainer() { return d3.select("#dag-viz-metadata"); }
+
+/*
+ * Show or hide the RDD DAG visualization.
+ * The graph is only rendered the first time this is called.
+ */
+function toggleDagViz(forJob) {
+  var arrowSelector = ".expand-dag-viz-arrow";
+  $(arrowSelector).toggleClass('arrow-closed');
+  $(arrowSelector).toggleClass('arrow-open');
+  var shouldShow = $(arrowSelector).hasClass("arrow-open");
+  if (shouldShow) {
+    var shouldRender = graphContainer().select("svg").empty();
+    if (shouldRender) {
+      renderDagViz(forJob);
+    }
+    graphContainer().style("display", "block");
+  } else {
+    // Save the graph for later so we don't have to render it again
+    graphContainer().style("display", "none");
+  }
+}
+
+/*
+ * Render the RDD DAG visualization.
+ *
+ * Input DOM hierarchy:
+ *   div#dag-viz-metadata >
+ *   div.stage-metadata >
+ *   div.[dot-file | incoming-edge | outgoing-edge]
+ *
+ * Output DOM hierarchy:
+ *   div#dag-viz-graph >
+ *   svg >
+ *   g#cluster_stage_[stageId]
+ *
+ * Note that the input metadata is populated by o.a.s.ui.UIUtils.showDagViz.
+ * Any changes in the input format here must be reflected there.
+ */
+function renderDagViz(forJob) {
+
+  // If there is not a dot file to render, fail fast and report error
+  if (metadataContainer().empty()) {
+    graphContainer().append("div").text(
+      "No visualization information available for this " + (forJob ? "job" : "stage"));
+    return;
+  }
+
+  var svg = graphContainer().append("svg");
+  if (forJob) {
+    renderDagVizForJob(svg);
+  } else {
+    renderDagVizForStage(svg);
+  }
+
+  // Find cached RDDs
+  metadataContainer().selectAll(".cached-rdd").each(function(v) {
+    var nodeId = VizConstants.nodePrefix + d3.select(this).text();
+    graphContainer().selectAll("#" + nodeId).classed("cached", true);
+  });
+
+  // Set the appropriate SVG dimensions to ensure that all elements are displayed
+  var boundingBox = svg.node().getBBox();
+  svg.style("width", (boundingBox.width + VizConstants.svgMarginX) + "px");
+  svg.style("height", (boundingBox.height + VizConstants.svgMarginY) + "px");
+
+  // Add labels to clusters because dagre-d3 doesn't do this for us
+  svg.selectAll("g.cluster rect").each(function() {
+    var rect = d3.select(this);
+    var cluster = d3.select(this.parentNode);
+    // Shift the boxes up a little to make room for the labels
+    rect.attr("y", toFloat(rect.attr("y")) - 10);
+    rect.attr("height", toFloat(rect.attr("height")) + 10);
+    var labelX = toFloat(rect.attr("x")) + toFloat(rect.attr("width")) - 5;
+    var labelY = toFloat(rect.attr("y")) + 15;
+    var labelText = cluster.attr("name").replace(VizConstants.clusterPrefix, "");
+    cluster.append("text")
+      .attr("x", labelX)
+      .attr("y", labelY)
+      .attr("text-anchor", "end")
+      .text(labelText);
+  });
+
+  // We have shifted a few elements upwards, so we should fix the SVG views
+  var startX = -VizConstants.svgMarginX;
+  var startY = -VizConstants.svgMarginY;
+  var endX = toFloat(svg.style("width")) + VizConstants.svgMarginX;
+  var endY = toFloat(svg.style("height")) + VizConstants.svgMarginY;
+  var newViewBox = startX + " " + startY + " " + endX + " " + endY;
+  svg.attr("viewBox", newViewBox);
+
+  // Lastly, apply some custom style to the DAG
+  styleDagViz(forJob);
+}
+
+/* Render the RDD DAG visualization for a stage. */
+function renderDagVizForStage(svgContainer) {
+  var metadata = metadataContainer().select(".stage-metadata");
+  var dot = metadata.select(".dot-file").text();
+  var containerId = VizConstants.graphPrefix + metadata.attr("stageId");
+  var container = svgContainer.append("g").attr("id", containerId);
+  renderDot(dot, container);
+}
+
+/*
+ * Render the RDD DAG visualization for a job.
+ *
+ * Due to limitations in dagre-d3, each stage is rendered independently so that
+ * we have more control on how to position them. Unfortunately, this means we
+ * cannot rely on dagre-d3 to render edges that cross stages and must render
+ * these manually on our own.
+ */
+function renderDagVizForJob(svgContainer) {
+  var crossStageEdges = [];
+
+  metadataContainer().selectAll(".stage-metadata").each(function(d, i) {
+    var metadata = d3.select(this);
+    var dot = metadata.select(".dot-file").text();
+    var stageId = metadata.attr("stageId");
+    var containerId = VizConstants.graphPrefix + stageId;
+    // TODO: handle stage attempts
+    var stageLink =
+      "/stages/stage/?id=" + stageId.replace(VizConstants.stagePrefix, "") + "&attempt=0";
+    var container = svgContainer
+      .append("a").attr("xlink:href", stageLink)
+      .append("g").attr("id", containerId);
+    // Now we need to shift the container for this stage so it doesn't overlap
+    // with existing ones. We do not need to do this for the first stage.
+    if (i > 0) {
+      // Take into account the position and width of the last stage's container
+      var existingStages = stageClusters();
+      if (!existingStages.empty()) {
+        var lastStage = existingStages[0].pop();
+        var lastStageId = d3.select(lastStage).attr("id");
+        var lastStageWidth = toFloat(d3.select("#" + lastStageId + " rect").attr("width"));
+        var lastStagePosition = getAbsolutePosition(lastStageId);
+        var offset = lastStagePosition.x + lastStageWidth + VizConstants.stageSep;
+        container.attr("transform", "translate(" + offset + ", 0)");
+      }
+    }
+    renderDot(dot, container);
+    // If there are any incoming edges into this graph, keep track of them to render
+    // them separately later. Note that we cannot draw them now because we need to
+    // put these edges in a separate container that is on top of all stage graphs.
+    metadata.selectAll(".incoming-edge").each(function(v) {
+      var edge = d3.select(this).text().split(","); // e.g. 3,4 => [3, 4]
+      crossStageEdges.push(edge);
+    });
+  });
+
+  // Draw edges that cross stages
+  if (crossStageEdges.length > 0) {
+    var container = svgContainer.append("g").attr("id", "cross-stage-edges");
+    for (var i = 0; i < crossStageEdges.length; i++) {
+      var fromRDDId = crossStageEdges[i][0];
+      var toRDDId = crossStageEdges[i][1];
+      connectRDDs(fromRDDId, toRDDId, container);
+    }
+  }
+}
+
+/* Render the dot file as an SVG in the given container. */
+function renderDot(dot, container) {
+  var escaped_dot = dot
+    .replace(/&lt;/g, "<")
+    .replace(/&gt;/g, ">")
+    .replace(/&quot;/g, "\"");
+  var g = graphlibDot.read(escaped_dot);
+  var renderer = new dagreD3.render();
+  renderer(container, g);
+}
+
+/* Style the visualization we just rendered. */
+function styleDagViz(forJob) {
+  graphContainer().selectAll("svg g.cluster rect")
+    .style("fill", "white")
+    .style("stroke", VizConstants.rddOperationColor)
+    .style("stroke-width", "4px")
+    .style("stroke-opacity", "0.5");
+  graphContainer().selectAll("svg g.cluster text")
+    .attr("fill", VizConstants.clusterLabelColor)
+    .attr("font-size", "11px");
+  graphContainer().selectAll("svg path")
+    .style("stroke", VizConstants.edgeColor)
+    .style("stroke-width", VizConstants.edgeWidth);
+  stageClusters()
+    .select("rect")
+    .style("stroke", VizConstants.stageColor)
+    .style("strokeWidth", "6px");
+
+  // Put an arrow at the end of every edge
+  // We need to do this because we manually render some edges ourselves
+  // For these edges, we borrow the arrow marker generated by dagre-d3
+  var dagreD3Marker = graphContainer().select("svg g.edgePaths marker").node();
+  graphContainer().select("svg")
+    .append(function() { return dagreD3Marker.cloneNode(true); })
+    .attr("id", "marker-arrow")
+    .select("path")
+    .attr("fill", VizConstants.edgeColor)
+    .attr("strokeWidth", "0px");
+  graphContainer().selectAll("svg g > path").attr("marker-end", "url(#marker-arrow)");
+  graphContainer().selectAll("svg g.edgePaths def").remove(); // We no longer need these
+
+  // Apply any job or stage specific styles
+  if (forJob) {
+    styleDagVizForJob();
+  } else {
+    styleDagVizForStage();
+  }
+}
+
+/* Apply job-page-specific style to the visualization. */
+function styleDagVizForJob() {
+  graphContainer().selectAll("svg g.node circle")
+    .style("fill", VizConstants.rddColor);
+  // TODO: add a legend to explain what a highlighted dot means
+  graphContainer().selectAll("svg g.cached circle")
+    .style("fill", VizConstants.rddCachedColor);
+  graphContainer().selectAll("svg g#cross-stage-edges path")
+    .style("fill", "none");
+}
+
+/* Apply stage-page-specific style to the visualization. */
+function styleDagVizForStage() {
+  graphContainer().selectAll("svg g.node rect")
+    .style("fill", "none")
+    .style("stroke", VizConstants.rddColor)
+    .style("stroke-width", "2px")
+    .attr("rx", "5") // round corners
+    .attr("ry", "5");
+    // TODO: add a legend to explain what a highlighted RDD means
+  graphContainer().selectAll("svg g.cached rect")
+    .style("stroke", VizConstants.rddCachedColor);
+  graphContainer().selectAll("svg g.node g.label text tspan")
+    .style("fill", VizConstants.rddColor);
+}
+
+/*
+ * (Job page only) Helper method to compute the absolute
+ * position of the group element identified by the given ID.
+ */
+function getAbsolutePosition(groupId) {
+  var obj = d3.select("#" + groupId).filter("g");
+  var _x = 0, _y = 0;
+  while (!obj.empty()) {
+    var transformText = obj.attr("transform");
+    var translate = d3.transform(transformText).translate
+    _x += translate[0];
+    _y += translate[1];
+    obj = d3.select(obj.node().parentNode).filter("g")
+  }
+  return { x: _x, y: _y };
+}
+
+/* (Job page only) Connect two RDD nodes with a curved edge. */
+function connectRDDs(fromRDDId, toRDDId, container) {
+  var fromNodeId = VizConstants.nodePrefix + fromRDDId;
+  var toNodeId = VizConstants.nodePrefix + toRDDId
+  var fromPos = getAbsolutePosition(fromNodeId);
+  var toPos = getAbsolutePosition(toNodeId);
+
+  // On the job page, RDDs are rendered as dots (circles). When rendering the path,
+  // we need to account for the radii of these circles. Otherwise the arrow heads
+  // will bleed into the circle itself.
+  var delta = toFloat(graphContainer()
+    .select("g.node#" + toNodeId)
+    .select("circle")
+    .attr("r"));
+  if (fromPos.x < toPos.x) {
+    fromPos.x += delta;
+    toPos.x -= delta;
+  } else if (fromPos.x > toPos.x) {
+    fromPos.x -= delta;
+    toPos.x += delta;
+  }
+
+  if (fromPos.y == toPos.y) {
+    // If they are on the same rank, curve the middle part of the edge
+    // upward a little to avoid interference with things in between
+    // e.g.       _______
+    //      _____/       \_____
+    var points = [
+      [fromPos.x, fromPos.y],
+      [fromPos.x + (toPos.x - fromPos.x) * 0.2, fromPos.y],
+      [fromPos.x + (toPos.x - fromPos.x) * 0.3, fromPos.y - 20],
+      [fromPos.x + (toPos.x - fromPos.x) * 0.7, fromPos.y - 20],
+      [fromPos.x + (toPos.x - fromPos.x) * 0.8, toPos.y],
+      [toPos.x, toPos.y]
+    ];
+  } else {
+    // Otherwise, draw a curved edge that flattens out on both ends
+    // e.g.       _____
+    //           /
+    //          |
+    //    _____/
+    var points = [
+      [fromPos.x, fromPos.y],
+      [fromPos.x + (toPos.x - fromPos.x) * 0.4, fromPos.y],
+      [fromPos.x + (toPos.x - fromPos.x) * 0.6, toPos.y],
+      [toPos.x, toPos.y]
+    ];
+  }
+
+  var line = d3.svg.line().interpolate("basis");
+  container.append("path").datum(points).attr("d", line);
+}
+
+/* Helper d3 accessor to clusters that represent stages. */
+function stageClusters() {
+  return graphContainer().selectAll("g.cluster").filter(function() {
+    return d3.select(this).attr("id").indexOf(VizConstants.stageClusterPrefix) > -1;
+  });
+}
+
+/* Helper method to convert attributes to numeric values. */
+function toFloat(f) {
+  return parseFloat(f.replace(/px$/, ""));
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/resources/org/apache/spark/ui/static/webui.css
----------------------------------------------------------------------
diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.css b/core/src/main/resources/org/apache/spark/ui/static/webui.css
index 4910744..669ad48 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/webui.css
+++ b/core/src/main/resources/org/apache/spark/ui/static/webui.css
@@ -145,7 +145,7 @@ pre {
   border: none;
 }
 
-span.expand-additional-metrics {
+span.expand-additional-metrics, span.expand-dag-viz {
   cursor: pointer;
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 4ef9054..b98a54b 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -659,6 +659,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
     setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, null)
   }
 
+  /**
+   * Execute a block of code in a scope such that all new RDDs created in this body will
+   * be part of the same scope. For more detail, see {{org.apache.spark.rdd.RDDOperationScope}}.
+   *
+   * Note: Return statements are NOT allowed in the given body.
+   */
+  private def withScope[U](body: => U): U = RDDOperationScope.withScope[U](this)(body)
+
   // Methods for creating RDDs
 
   /** Distribute a local Scala collection to form an RDD.
@@ -669,7 +677,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    * @note avoid using `parallelize(Seq())` to create an empty `RDD`. Consider `emptyRDD` for an
    * RDD with no partitions, or `parallelize(Seq[T]())` for an RDD of `T` with empty partitions.
    */
-  def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
+  def parallelize[T: ClassTag](
+      seq: Seq[T],
+      numSlices: Int = defaultParallelism): RDD[T] = withScope {
     assertNotStopped()
     new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
   }
@@ -678,14 +688,16 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    *
    * This method is identical to `parallelize`.
    */
-  def makeRDD[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
+  def makeRDD[T: ClassTag](
+      seq: Seq[T],
+      numSlices: Int = defaultParallelism): RDD[T] = withScope {
     parallelize(seq, numSlices)
   }
 
   /** Distribute a local Scala collection to form an RDD, with one or more
     * location preferences (hostnames of Spark nodes) for each object.
     * Create a new partition for each collection item. */
-  def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = {
+  def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope {
     assertNotStopped()
     val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap
     new ParallelCollectionRDD[T](this, seq.map(_._1), seq.size, indexToPrefs)
@@ -695,10 +707,12 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    * Read a text file from HDFS, a local file system (available on all nodes), or any
    * Hadoop-supported file system URI, and return it as an RDD of Strings.
    */
-  def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
+  def textFile(
+      path: String,
+      minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
     assertNotStopped()
     hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
-      minPartitions).map(pair => pair._2.toString).setName(path)
+      minPartitions).map(pair => pair._2.toString)
   }
 
   /**
@@ -728,8 +742,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    *
    * @param minPartitions A suggestion value of the minimal splitting number for input data.
    */
-  def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions):
-  RDD[(String, String)] = {
+  def wholeTextFiles(
+      path: String,
+      minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = withScope {
     assertNotStopped()
     val job = new NewHadoopJob(hadoopConfiguration)
     // Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking
@@ -776,8 +791,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    * @note Small files are preferred; very large files may cause bad performance.
    */
   @Experimental
-  def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions):
-      RDD[(String, PortableDataStream)] = {
+  def binaryFiles(
+      path: String,
+      minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)] = withScope {
     assertNotStopped()
     val job = new NewHadoopJob(hadoopConfiguration)
     // Use setInputPaths so that binaryFiles aligns with hadoopFile/textFile in taking
@@ -806,8 +822,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    * @return An RDD of data with values, represented as byte arrays
    */
   @Experimental
-  def binaryRecords(path: String, recordLength: Int, conf: Configuration = hadoopConfiguration)
-      : RDD[Array[Byte]] = {
+  def binaryRecords(
+      path: String,
+      recordLength: Int,
+      conf: Configuration = hadoopConfiguration): RDD[Array[Byte]] = withScope {
     assertNotStopped()
     conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength)
     val br = newAPIHadoopFile[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](path,
@@ -848,8 +866,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
       inputFormatClass: Class[_ <: InputFormat[K, V]],
       keyClass: Class[K],
       valueClass: Class[V],
-      minPartitions: Int = defaultMinPartitions
-      ): RDD[(K, V)] = {
+      minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
     assertNotStopped()
     // Add necessary security credentials to the JobConf before broadcasting it.
     SparkHadoopUtil.get.addCredentials(conf)
@@ -869,8 +886,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
       inputFormatClass: Class[_ <: InputFormat[K, V]],
       keyClass: Class[K],
       valueClass: Class[V],
-      minPartitions: Int = defaultMinPartitions
-      ): RDD[(K, V)] = {
+      minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
     assertNotStopped()
     // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
     val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
@@ -901,7 +917,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    */
   def hadoopFile[K, V, F <: InputFormat[K, V]]
       (path: String, minPartitions: Int)
-      (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
+      (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = withScope {
     hadoopFile(path,
       fm.runtimeClass.asInstanceOf[Class[F]],
       km.runtimeClass.asInstanceOf[Class[K]],
@@ -924,13 +940,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    * copy them using a `map` function.
    */
   def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
-      (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] =
+      (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = withScope {
     hadoopFile[K, V, F](path, defaultMinPartitions)
+  }
 
   /** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
   def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]
       (path: String)
-      (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
+      (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = withScope {
     newAPIHadoopFile(
       path,
       fm.runtimeClass.asInstanceOf[Class[F]],
@@ -953,7 +970,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
       fClass: Class[F],
       kClass: Class[K],
       vClass: Class[V],
-      conf: Configuration = hadoopConfiguration): RDD[(K, V)] = {
+      conf: Configuration = hadoopConfiguration): RDD[(K, V)] = withScope {
     assertNotStopped()
     // The call to new NewHadoopJob automatically adds security credentials to conf,
     // so we don't need to explicitly add them ourselves
@@ -987,7 +1004,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
       conf: Configuration = hadoopConfiguration,
       fClass: Class[F],
       kClass: Class[K],
-      vClass: Class[V]): RDD[(K, V)] = {
+      vClass: Class[V]): RDD[(K, V)] = withScope {
     assertNotStopped()
     // Add necessary security credentials to the JobConf. Required to access secure HDFS.
     val jconf = new JobConf(conf)
@@ -1007,7 +1024,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
       keyClass: Class[K],
       valueClass: Class[V],
       minPartitions: Int
-      ): RDD[(K, V)] = {
+      ): RDD[(K, V)] = withScope {
     assertNotStopped()
     val inputFormatClass = classOf[SequenceFileInputFormat[K, V]]
     hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions)
@@ -1021,7 +1038,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
     * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
     * copy them using a `map` function.
     * */
-  def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]): RDD[(K, V)] = {
+  def sequenceFile[K, V](
+      path: String,
+      keyClass: Class[K],
+      valueClass: Class[V]): RDD[(K, V)] = withScope {
     assertNotStopped()
     sequenceFile(path, keyClass, valueClass, defaultMinPartitions)
   }
@@ -1051,16 +1071,17 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    def sequenceFile[K, V]
        (path: String, minPartitions: Int = defaultMinPartitions)
        (implicit km: ClassTag[K], vm: ClassTag[V],
-        kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
-      : RDD[(K, V)] = {
-    assertNotStopped()
-    val kc = kcf()
-    val vc = vcf()
-    val format = classOf[SequenceFileInputFormat[Writable, Writable]]
-    val writables = hadoopFile(path, format,
+        kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]): RDD[(K, V)] = {
+    withScope {
+      assertNotStopped()
+      val kc = kcf()
+      val vc = vcf()
+      val format = classOf[SequenceFileInputFormat[Writable, Writable]]
+      val writables = hadoopFile(path, format,
         kc.writableClass(km).asInstanceOf[Class[Writable]],
         vc.writableClass(vm).asInstanceOf[Class[Writable]], minPartitions)
-    writables.map { case (k, v) => (kc.convert(k), vc.convert(v)) }
+      writables.map { case (k, v) => (kc.convert(k), vc.convert(v)) }
+    }
   }
 
   /**
@@ -1073,21 +1094,18 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    */
   def objectFile[T: ClassTag](
       path: String,
-      minPartitions: Int = defaultMinPartitions
-      ): RDD[T] = {
+      minPartitions: Int = defaultMinPartitions): RDD[T] = withScope {
     assertNotStopped()
     sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minPartitions)
       .flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes, Utils.getContextOrSparkClassLoader))
   }
 
-  protected[spark] def checkpointFile[T: ClassTag](
-      path: String
-    ): RDD[T] = {
+  protected[spark] def checkpointFile[T: ClassTag](path: String): RDD[T] = withScope {
     new CheckpointRDD[T](this, path)
   }
 
   /** Build the union of a list of RDDs. */
-  def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = {
+  def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = withScope {
     val partitioners = rdds.flatMap(_.partitioner).toSet
     if (rdds.forall(_.partitioner.isDefined) && partitioners.size == 1) {
       new PartitionerAwareUnionRDD(this, rdds)
@@ -1097,8 +1115,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
   }
 
   /** Build the union of a list of RDDs passed as variable-length arguments. */
-  def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] =
+  def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] = withScope {
     union(Seq(first) ++ rest)
+  }
 
   /** Get an RDD that has no partitions or elements. */
   def emptyRDD[T: ClassTag]: EmptyRDD[T] = new EmptyRDD[T](this)
@@ -2060,10 +2079,10 @@ object SparkContext extends Logging {
   }
 
   private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description"
-
   private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id"
-
   private[spark] val SPARK_JOB_INTERRUPT_ON_CANCEL = "spark.job.interruptOnCancel"
+  private[spark] val RDD_SCOPE_KEY = "spark.rdd.scope"
+  private[spark] val RDD_SCOPE_NO_OVERRIDE_KEY = "spark.rdd.scope.noOverride"
 
   /**
    * Executor id for the driver.  In earlier versions of Spark, this was `<driver>`, but this was

http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
index 3406a7e..ec18534 100644
--- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
@@ -33,7 +33,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
   /**
    * Returns a future for counting the number of elements in the RDD.
    */
-  def countAsync(): FutureAction[Long] = {
+  def countAsync(): FutureAction[Long] = self.withScope {
     val totalCount = new AtomicLong
     self.context.submitJob(
       self,
@@ -53,7 +53,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
   /**
    * Returns a future for retrieving all elements of this RDD.
    */
-  def collectAsync(): FutureAction[Seq[T]] = {
+  def collectAsync(): FutureAction[Seq[T]] = self.withScope {
     val results = new Array[Array[T]](self.partitions.length)
     self.context.submitJob[T, Array[T], Seq[T]](self, _.toArray, Range(0, self.partitions.length),
       (index, data) => results(index) = data, results.flatten.toSeq)
@@ -62,7 +62,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
   /**
    * Returns a future for retrieving the first num elements of the RDD.
    */
-  def takeAsync(num: Int): FutureAction[Seq[T]] = {
+  def takeAsync(num: Int): FutureAction[Seq[T]] = self.withScope {
     val f = new ComplexFutureAction[Seq[T]]
 
     f.run {
@@ -109,7 +109,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
   /**
    * Applies a function f to all elements of this RDD.
    */
-  def foreachAsync(f: T => Unit): FutureAction[Unit] = {
+  def foreachAsync(f: T => Unit): FutureAction[Unit] = self.withScope {
     val cleanF = self.context.clean(f)
     self.context.submitJob[T, Unit, Unit](self, _.foreach(cleanF), Range(0, self.partitions.length),
       (index, data) => Unit, Unit)
@@ -118,7 +118,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
   /**
    * Applies a function f to each partition of this RDD.
    */
-  def foreachPartitionAsync(f: Iterator[T] => Unit): FutureAction[Unit] = {
+  def foreachPartitionAsync(f: Iterator[T] => Unit): FutureAction[Unit] = self.withScope {
     self.context.submitJob[T, Unit, Unit](self, f, Range(0, self.partitions.length),
       (index, data) => Unit, Unit)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
index 843a893..926bce6 100644
--- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
@@ -30,7 +30,7 @@ import org.apache.spark.util.StatCounter
  */
 class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
   /** Add up the elements in this RDD. */
-  def sum(): Double = {
+  def sum(): Double = self.withScope {
     self.fold(0.0)(_ + _)
   }
 
@@ -38,37 +38,49 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
    * Return a [[org.apache.spark.util.StatCounter]] object that captures the mean, variance and
    * count of the RDD's elements in one operation.
    */
-  def stats(): StatCounter = {
+  def stats(): StatCounter = self.withScope {
     self.mapPartitions(nums => Iterator(StatCounter(nums))).reduce((a, b) => a.merge(b))
   }
 
   /** Compute the mean of this RDD's elements. */
-  def mean(): Double = stats().mean
+  def mean(): Double = self.withScope {
+    stats().mean
+  }
 
   /** Compute the variance of this RDD's elements. */
-  def variance(): Double = stats().variance
+  def variance(): Double = self.withScope {
+    stats().variance
+  }
 
   /** Compute the standard deviation of this RDD's elements. */
-  def stdev(): Double = stats().stdev
+  def stdev(): Double = self.withScope {
+    stats().stdev
+  }
 
   /**
    * Compute the sample standard deviation of this RDD's elements (which corrects for bias in
    * estimating the standard deviation by dividing by N-1 instead of N).
    */
-  def sampleStdev(): Double = stats().sampleStdev
+  def sampleStdev(): Double = self.withScope {
+    stats().sampleStdev
+  }
 
   /**
    * Compute the sample variance of this RDD's elements (which corrects for bias in
    * estimating the variance by dividing by N-1 instead of N).
    */
-  def sampleVariance(): Double = stats().sampleVariance
+  def sampleVariance(): Double = self.withScope {
+    stats().sampleVariance
+  }
 
   /**
    * :: Experimental ::
    * Approximate operation to return the mean within a timeout.
    */
   @Experimental
-  def meanApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
+  def meanApprox(
+      timeout: Long,
+      confidence: Double = 0.95): PartialResult[BoundedDouble] = self.withScope {
     val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns)
     val evaluator = new MeanEvaluator(self.partitions.length, confidence)
     self.context.runApproximateJob(self, processPartition, evaluator, timeout)
@@ -79,7 +91,9 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
    * Approximate operation to return the sum within a timeout.
    */
   @Experimental
-  def sumApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
+  def sumApprox(
+      timeout: Long,
+      confidence: Double = 0.95): PartialResult[BoundedDouble] = self.withScope {
     val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns)
     val evaluator = new SumEvaluator(self.partitions.length, confidence)
     self.context.runApproximateJob(self, processPartition, evaluator, timeout)
@@ -93,7 +107,7 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
    * If the RDD contains infinity, NaN throws an exception
    * If the elements in RDD do not vary (max == min) always returns a single bucket.
    */
-  def histogram(bucketCount: Int): Pair[Array[Double], Array[Long]] = {
+  def histogram(bucketCount: Int): Pair[Array[Double], Array[Long]] = self.withScope {
     // Scala's built-in range has issues. See #SI-8782
     def customRange(min: Double, max: Double, steps: Int): IndexedSeq[Double] = {
       val span = max - min
@@ -140,7 +154,9 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
    * the maximum value of the last position and all NaN entries will be counted
    * in that bucket.
    */
-  def histogram(buckets: Array[Double], evenBuckets: Boolean = false): Array[Long] = {
+  def histogram(
+      buckets: Array[Double],
+      evenBuckets: Boolean = false): Array[Long] = self.withScope {
     if (buckets.length < 2) {
       throw new IllegalArgumentException("buckets array must have at least two elements")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index f77abac..2cefe63 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -99,7 +99,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
  */
 @DeveloperApi
 class HadoopRDD[K, V](
-    sc: SparkContext,
+    @transient sc: SparkContext,
     broadcastedConf: Broadcast[SerializableWritable[Configuration]],
     initLocalJobConfFuncOpt: Option[JobConf => Unit],
     inputFormatClass: Class[_ <: InputFormat[K, V]],
@@ -108,6 +108,10 @@ class HadoopRDD[K, V](
     minPartitions: Int)
   extends RDD[(K, V)](sc, Nil) with Logging {
 
+  if (initLocalJobConfFuncOpt.isDefined) {
+    sc.clean(initLocalJobConfFuncOpt.get)
+  }
+
   def this(
       sc: SparkContext,
       conf: JobConf,

http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
index 6afe501..d71bb63 100644
--- a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
@@ -57,7 +57,7 @@ class OrderedRDDFunctions[K : Ordering : ClassTag,
    */
   // TODO: this currently doesn't work on P other than Tuple2!
   def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
-      : RDD[(K, V)] =
+      : RDD[(K, V)] = self.withScope
   {
     val part = new RangePartitioner(numPartitions, self, ascending)
     new ShuffledRDD[K, V, V](self, part)
@@ -71,7 +71,7 @@ class OrderedRDDFunctions[K : Ordering : ClassTag,
    * This is more efficient than calling `repartition` and then sorting within each partition
    * because it can push the sorting down into the shuffle machinery.
    */
-  def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] = {
+  def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
     new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering)
   }
 
@@ -81,7 +81,7 @@ class OrderedRDDFunctions[K : Ordering : ClassTag,
    * performed efficiently by only scanning the partitions that might contain matching elements.
    * Otherwise, a standard `filter` is applied to all partitions.
    */
-  def filterByRange(lower: K, upper: K): RDD[P] = {
+  def filterByRange(lower: K, upper: K): RDD[P] = self.withScope {
 
     def inRange(k: K): Boolean = ordering.gteq(k, lower) && ordering.lteq(k, upper)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 05351ba..93d338f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -29,7 +29,7 @@ import scala.util.DynamicVariable
 
 import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
 import org.apache.hadoop.conf.{Configurable, Configuration}
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.FileSystem
 import org.apache.hadoop.io.SequenceFile.CompressionType
 import org.apache.hadoop.io.compress.CompressionCodec
 import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat}
@@ -75,7 +75,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       mergeCombiners: (C, C) => C,
       partitioner: Partitioner,
       mapSideCombine: Boolean = true,
-      serializer: Serializer = null): RDD[(K, C)] = {
+      serializer: Serializer = null): RDD[(K, C)] = self.withScope {
     require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
     if (keyClass.isArray) {
       if (mapSideCombine) {
@@ -108,7 +108,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
   def combineByKey[C](createCombiner: V => C,
       mergeValue: (C, V) => C,
       mergeCombiners: (C, C) => C,
-      numPartitions: Int): RDD[(K, C)] = {
+      numPartitions: Int): RDD[(K, C)] = self.withScope {
     combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions))
   }
 
@@ -122,7 +122,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * instead of creating a new U.
    */
   def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
-      combOp: (U, U) => U): RDD[(K, U)] = {
+      combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
     // Serialize the zero value to a byte array so that we can get a new clone of it on each key
     val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
     val zeroArray = new Array[Byte](zeroBuffer.limit)
@@ -144,7 +144,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * instead of creating a new U.
    */
   def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U,
-      combOp: (U, U) => U): RDD[(K, U)] = {
+      combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
     aggregateByKey(zeroValue, new HashPartitioner(numPartitions))(seqOp, combOp)
   }
 
@@ -158,7 +158,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * instead of creating a new U.
    */
   def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
-      combOp: (U, U) => U): RDD[(K, U)] = {
+      combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
     aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp)
   }
 
@@ -167,7 +167,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * may be added to the result an arbitrary number of times, and must not change the result
    * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
    */
-  def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = {
+  def foldByKey(
+      zeroValue: V,
+      partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
     // Serialize the zero value to a byte array so that we can get a new clone of it on each key
     val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
     val zeroArray = new Array[Byte](zeroBuffer.limit)
@@ -185,7 +187,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * may be added to the result an arbitrary number of times, and must not change the result
    * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
    */
-  def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] = {
+  def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
     foldByKey(zeroValue, new HashPartitioner(numPartitions))(func)
   }
 
@@ -194,7 +196,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * may be added to the result an arbitrary number of times, and must not change the result
    * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
    */
-  def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = {
+  def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
     foldByKey(zeroValue, defaultPartitioner(self))(func)
   }
 
@@ -213,7 +215,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    */
   def sampleByKey(withReplacement: Boolean,
       fractions: Map[K, Double],
-      seed: Long = Utils.random.nextLong): RDD[(K, V)] = {
+      seed: Long = Utils.random.nextLong): RDD[(K, V)] = self.withScope {
 
     require(fractions.values.forall(v => v >= 0.0), "Negative sampling rates.")
 
@@ -242,9 +244,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * @return RDD containing the sampled subset
    */
   @Experimental
-  def sampleByKeyExact(withReplacement: Boolean,
+  def sampleByKeyExact(
+      withReplacement: Boolean,
       fractions: Map[K, Double],
-      seed: Long = Utils.random.nextLong): RDD[(K, V)] = {
+      seed: Long = Utils.random.nextLong): RDD[(K, V)] = self.withScope {
 
     require(fractions.values.forall(v => v >= 0.0), "Negative sampling rates.")
 
@@ -261,7 +264,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * the merging locally on each mapper before sending results to a reducer, similarly to a
    * "combiner" in MapReduce.
    */
-  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
+  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
     combineByKey[V]((v: V) => v, func, func, partitioner)
   }
 
@@ -270,7 +273,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * the merging locally on each mapper before sending results to a reducer, similarly to a
    * "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
    */
-  def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = {
+  def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {
     reduceByKey(new HashPartitioner(numPartitions), func)
   }
 
@@ -280,7 +283,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
    * parallelism level.
    */
-  def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {
+  def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
     reduceByKey(defaultPartitioner(self), func)
   }
 
@@ -289,7 +292,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * immediately to the master as a Map. This will also perform the merging locally on each mapper
    * before sending results to a reducer, similarly to a "combiner" in MapReduce.
    */
-  def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = {
+  def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = self.withScope {
 
     if (keyClass.isArray) {
       throw new SparkException("reduceByKeyLocally() does not support array keys")
@@ -317,7 +320,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
 
   /** Alias for reduceByKeyLocally */
   @deprecated("Use reduceByKeyLocally", "1.0.0")
-  def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = reduceByKeyLocally(func)
+  def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = self.withScope {
+    reduceByKeyLocally(func)
+  }
 
   /** 
    * Count the number of elements for each key, collecting the results to a local Map.
@@ -327,7 +332,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * To handle very large results, consider using rdd.mapValues(_ => 1L).reduceByKey(_ + _), which
    * returns an RDD[T, Long] instead of a map.
    */
-  def countByKey(): Map[K, Long] = self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap
+  def countByKey(): Map[K, Long] = self.withScope {
+    self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap
+  }
 
   /**
    * :: Experimental ::
@@ -336,7 +343,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    */
   @Experimental
   def countByKeyApprox(timeout: Long, confidence: Double = 0.95)
-      : PartialResult[Map[K, BoundedDouble]] = {
+      : PartialResult[Map[K, BoundedDouble]] = self.withScope {
     self.map(_._1).countByValueApprox(timeout, confidence)
   }
 
@@ -360,7 +367,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * @param partitioner Partitioner to use for the resulting RDD.
    */
   @Experimental
-  def countApproxDistinctByKey(p: Int, sp: Int, partitioner: Partitioner): RDD[(K, Long)] = {
+  def countApproxDistinctByKey(
+      p: Int,
+      sp: Int,
+      partitioner: Partitioner): RDD[(K, Long)] = self.withScope {
     require(p >= 4, s"p ($p) must be >= 4")
     require(sp <= 32, s"sp ($sp) must be <= 32")
     require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)")
@@ -392,7 +402,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    *                   It must be greater than 0.000017.
    * @param partitioner partitioner of the resulting RDD
    */
-  def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = {
+  def countApproxDistinctByKey(
+      relativeSD: Double,
+      partitioner: Partitioner): RDD[(K, Long)] = self.withScope {
     require(relativeSD > 0.000017, s"accuracy ($relativeSD) must be greater than 0.000017")
     val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt
     assert(p <= 32)
@@ -410,7 +422,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    *                   It must be greater than 0.000017.
    * @param numPartitions number of partitions of the resulting RDD
    */
-  def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = {
+  def countApproxDistinctByKey(
+      relativeSD: Double,
+      numPartitions: Int): RDD[(K, Long)] = self.withScope {
     countApproxDistinctByKey(relativeSD, new HashPartitioner(numPartitions))
   }
 
@@ -424,7 +438,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * @param relativeSD Relative accuracy. Smaller values create counters that require more space.
    *                   It must be greater than 0.000017.
    */
-  def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = {
+  def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = self.withScope {
     countApproxDistinctByKey(relativeSD, defaultPartitioner(self))
   }
 
@@ -441,7 +455,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any
    * key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].
    */
-  def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = {
+  def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
     // groupByKey shouldn't use map side combine because map side combine does not
     // reduce the amount of data shuffled and requires all map side data be inserted
     // into a hash table, leading to more objects in the old gen.
@@ -465,14 +479,14 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any
    * key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].
    */
-  def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = {
+  def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = self.withScope {
     groupByKey(new HashPartitioner(numPartitions))
   }
 
   /**
    * Return a copy of the RDD partitioned using the specified partitioner.
    */
-  def partitionBy(partitioner: Partitioner): RDD[(K, V)] = {
+  def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
     if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
       throw new SparkException("Default partitioner cannot partition array keys.")
     }
@@ -488,7 +502,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
    * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
    */
-  def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
+  def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
     this.cogroup(other, partitioner).flatMapValues( pair =>
       for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
     )
@@ -500,7 +514,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to
    * partition the output RDD.
    */
-  def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
+  def leftOuterJoin[W](
+      other: RDD[(K, W)],
+      partitioner: Partitioner): RDD[(K, (V, Option[W]))] = self.withScope {
     this.cogroup(other, partitioner).flatMapValues { pair =>
       if (pair._2.isEmpty) {
         pair._1.iterator.map(v => (v, None))
@@ -517,7 +533,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * partition the output RDD.
    */
   def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
-      : RDD[(K, (Option[V], W))] = {
+      : RDD[(K, (Option[V], W))] = self.withScope {
     this.cogroup(other, partitioner).flatMapValues { pair =>
       if (pair._1.isEmpty) {
         pair._2.iterator.map(w => (None, w))
@@ -536,7 +552,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * in `this` have key k. Uses the given Partitioner to partition the output RDD.
    */
   def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
-      : RDD[(K, (Option[V], Option[W]))] = {
+      : RDD[(K, (Option[V], Option[W]))] = self.withScope {
     this.cogroup(other, partitioner).flatMapValues {
       case (vs, Seq()) => vs.iterator.map(v => (Some(v), None))
       case (Seq(), ws) => ws.iterator.map(w => (None, Some(w)))
@@ -549,7 +565,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * existing partitioner/parallelism level.
    */
   def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
-    : RDD[(K, C)] = {
+    : RDD[(K, C)] = self.withScope {
     combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self))
   }
 
@@ -563,7 +579,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
    * or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
    */
-  def groupByKey(): RDD[(K, Iterable[V])] = {
+  def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {
     groupByKey(defaultPartitioner(self))
   }
 
@@ -572,7 +588,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
    * (k, v2) is in `other`. Performs a hash join across the cluster.
    */
-  def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = {
+  def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = self.withScope {
     join(other, defaultPartitioner(self, other))
   }
 
@@ -581,7 +597,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
    * (k, v2) is in `other`. Performs a hash join across the cluster.
    */
-  def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] = {
+  def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] = self.withScope {
     join(other, new HashPartitioner(numPartitions))
   }
 
@@ -591,7 +607,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
    * using the existing partitioner/parallelism level.
    */
-  def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = {
+  def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = self.withScope {
     leftOuterJoin(other, defaultPartitioner(self, other))
   }
 
@@ -601,7 +617,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
    * into `numPartitions` partitions.
    */
-  def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))] = {
+  def leftOuterJoin[W](
+      other: RDD[(K, W)],
+      numPartitions: Int): RDD[(K, (V, Option[W]))] = self.withScope {
     leftOuterJoin(other, new HashPartitioner(numPartitions))
   }
 
@@ -611,7 +629,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
    * RDD using the existing partitioner/parallelism level.
    */
-  def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = {
+  def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = self.withScope {
     rightOuterJoin(other, defaultPartitioner(self, other))
   }
 
@@ -621,7 +639,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
    * RDD into the given number of partitions.
    */
-  def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))] = {
+  def rightOuterJoin[W](
+      other: RDD[(K, W)],
+      numPartitions: Int): RDD[(K, (Option[V], W))] = self.withScope {
     rightOuterJoin(other, new HashPartitioner(numPartitions))
   }
 
@@ -634,7 +654,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * in `this` have key k. Hash-partitions the resulting RDD using the existing partitioner/
    * parallelism level.
    */
-  def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))] = {
+  def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))] = self.withScope {
     fullOuterJoin(other, defaultPartitioner(self, other))
   }
 
@@ -646,7 +666,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
    * in `this` have key k. Hash-partitions the resulting RDD into the given number of partitions.
    */
-  def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))] = {
+  def fullOuterJoin[W](
+      other: RDD[(K, W)],
+      numPartitions: Int): RDD[(K, (Option[V], Option[W]))] = self.withScope {
     fullOuterJoin(other, new HashPartitioner(numPartitions))
   }
 
@@ -656,7 +678,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * Warning: this doesn't return a multimap (so if you have multiple values to the same key, only
    *          one value per key is preserved in the map returned)
    */
-  def collectAsMap(): Map[K, V] = {
+  def collectAsMap(): Map[K, V] = self.withScope {
     val data = self.collect()
     val map = new mutable.HashMap[K, V]
     map.sizeHint(data.length)
@@ -668,7 +690,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * Pass each value in the key-value pair RDD through a map function without changing the keys;
    * this also retains the original RDD's partitioning.
    */
-  def mapValues[U](f: V => U): RDD[(K, U)] = {
+  def mapValues[U](f: V => U): RDD[(K, U)] = self.withScope {
     val cleanF = self.context.clean(f)
     new MapPartitionsRDD[(K, U), (K, V)](self,
       (context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },
@@ -679,7 +701,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * Pass each value in the key-value pair RDD through a flatMap function without changing the
    * keys; this also retains the original RDD's partitioning.
    */
-  def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = {
+  def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = self.withScope {
     val cleanF = self.context.clean(f)
     new MapPartitionsRDD[(K, U), (K, V)](self,
       (context, pid, iter) => iter.flatMap { case (k, v) =>
@@ -697,7 +719,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       other2: RDD[(K, W2)],
       other3: RDD[(K, W3)],
       partitioner: Partitioner)
-      : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = {
+      : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {
     if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
       throw new SparkException("Default partitioner cannot partition array keys.")
     }
@@ -715,7 +737,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * list of values for that key in `this` as well as `other`.
    */
   def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
-      : RDD[(K, (Iterable[V], Iterable[W]))]  = {
+      : RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
     if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
       throw new SparkException("Default partitioner cannot partition array keys.")
     }
@@ -730,7 +752,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * tuple with the list of values for that key in `this`, `other1` and `other2`.
    */
   def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
-      : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {
+      : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope {
     if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
       throw new SparkException("Default partitioner cannot partition array keys.")
     }
@@ -748,7 +770,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * for that key in `this`, `other1`, `other2` and `other3`.
    */
   def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)])
-      : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = {
+      : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {
     cogroup(other1, other2, other3, defaultPartitioner(self, other1, other2, other3))
   }
 
@@ -756,7 +778,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
    * list of values for that key in `this` as well as `other`.
    */
-  def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = {
+  def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
     cogroup(other, defaultPartitioner(self, other))
   }
 
@@ -765,7 +787,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * tuple with the list of values for that key in `this`, `other1` and `other2`.
    */
   def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
-      : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {
+      : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope {
     cogroup(other1, other2, defaultPartitioner(self, other1, other2))
   }
 
@@ -773,7 +795,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
    * list of values for that key in `this` as well as `other`.
    */
-  def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] = {
+  def cogroup[W](
+      other: RDD[(K, W)],
+      numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
     cogroup(other, new HashPartitioner(numPartitions))
   }
 
@@ -782,7 +806,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * tuple with the list of values for that key in `this`, `other1` and `other2`.
    */
   def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int)
-      : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {
+      : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope {
     cogroup(other1, other2, new HashPartitioner(numPartitions))
   }
 
@@ -795,24 +819,24 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       other2: RDD[(K, W2)],
       other3: RDD[(K, W3)],
       numPartitions: Int)
-      : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = {
+      : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {
     cogroup(other1, other2, other3, new HashPartitioner(numPartitions))
   }
 
   /** Alias for cogroup. */
-  def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = {
+  def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
     cogroup(other, defaultPartitioner(self, other))
   }
 
   /** Alias for cogroup. */
   def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
-      : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {
+      : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope {
     cogroup(other1, other2, defaultPartitioner(self, other1, other2))
   }
 
   /** Alias for cogroup. */
   def groupWith[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)])
-      : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = {
+      : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {
     cogroup(other1, other2, other3, defaultPartitioner(self, other1, other2, other3))
   }
 
@@ -822,22 +846,27 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
    * RDD will be <= us.
    */
-  def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)] =
+  def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)] = self.withScope {
     subtractByKey(other, self.partitioner.getOrElse(new HashPartitioner(self.partitions.length)))
+  }
 
   /** Return an RDD with the pairs from `this` whose keys are not in `other`. */
-  def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)] =
+  def subtractByKey[W: ClassTag](
+      other: RDD[(K, W)],
+      numPartitions: Int): RDD[(K, V)] = self.withScope {
     subtractByKey(other, new HashPartitioner(numPartitions))
+  }
 
   /** Return an RDD with the pairs from `this` whose keys are not in `other`. */
-  def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)] =
+  def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)] = self.withScope {
     new SubtractedRDD[K, V, W](self, other, p)
+  }
 
   /**
    * Return the list of values in the RDD for key `key`. This operation is done efficiently if the
    * RDD has a known partitioner by only searching the partition that the key maps to.
    */
-  def lookup(key: K): Seq[V] = {
+  def lookup(key: K): Seq[V] = self.withScope {
     self.partitioner match {
       case Some(p) =>
         val index = p.getPartition(key)
@@ -859,7 +888,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
    * supporting the key and value types K and V in this RDD.
    */
-  def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]) {
+  def saveAsHadoopFile[F <: OutputFormat[K, V]](
+      path: String)(implicit fm: ClassTag[F]): Unit = self.withScope {
     saveAsHadoopFile(path, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]])
   }
 
@@ -869,7 +899,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * supplied codec.
    */
   def saveAsHadoopFile[F <: OutputFormat[K, V]](
-      path: String, codec: Class[_ <: CompressionCodec]) (implicit fm: ClassTag[F]) {
+      path: String,
+      codec: Class[_ <: CompressionCodec])(implicit fm: ClassTag[F]): Unit = self.withScope {
     val runtimeClass = fm.runtimeClass
     saveAsHadoopFile(path, keyClass, valueClass, runtimeClass.asInstanceOf[Class[F]], codec)
   }
@@ -878,7 +909,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat`
    * (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD.
    */
-  def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]) {
+  def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](
+      path: String)(implicit fm: ClassTag[F]): Unit = self.withScope {
     saveAsNewAPIHadoopFile(path, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]])
   }
 
@@ -891,8 +923,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       keyClass: Class[_],
       valueClass: Class[_],
       outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
-      conf: Configuration = self.context.hadoopConfiguration)
-  {
+      conf: Configuration = self.context.hadoopConfiguration): Unit = self.withScope {
     // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
     val hadoopConf = conf
     val job = new NewAPIHadoopJob(hadoopConf)
@@ -912,7 +943,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       keyClass: Class[_],
       valueClass: Class[_],
       outputFormatClass: Class[_ <: OutputFormat[_, _]],
-      codec: Class[_ <: CompressionCodec]) {
+      codec: Class[_ <: CompressionCodec]): Unit = self.withScope {
     saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass,
       new JobConf(self.context.hadoopConfiguration), Some(codec))
   }
@@ -927,7 +958,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       valueClass: Class[_],
       outputFormatClass: Class[_ <: OutputFormat[_, _]],
       conf: JobConf = new JobConf(self.context.hadoopConfiguration),
-      codec: Option[Class[_ <: CompressionCodec]] = None) {
+      codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope {
     // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
     val hadoopConf = conf
     hadoopConf.setOutputKeyClass(keyClass)
@@ -960,7 +991,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * output paths required (e.g. a table name to write to) in the same way as it would be
    * configured for a Hadoop MapReduce job.
    */
-  def saveAsNewAPIHadoopDataset(conf: Configuration) {
+  def saveAsNewAPIHadoopDataset(conf: Configuration): Unit = self.withScope {
     // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
     val hadoopConf = conf
     val job = new NewAPIHadoopJob(hadoopConf)
@@ -1027,7 +1058,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * (e.g. a table name to write to) in the same way as it would be configured for a Hadoop
    * MapReduce job.
    */
-  def saveAsHadoopDataset(conf: JobConf) {
+  def saveAsHadoopDataset(conf: JobConf): Unit = self.withScope {
     // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
     val hadoopConf = conf
     val wrappedConf = new SerializableWritable(hadoopConf)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org