You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/05/05 01:21:44 UTC
[3/7] spark git commit: [SPARK-6943] [SPARK-6944] DAG visualization
on SparkUI
http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js
----------------------------------------------------------------------
diff --git a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js
new file mode 100644
index 0000000..99b0294
--- /dev/null
+++ b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * This file contains the logic to render the RDD DAG visualization in the UI.
+ *
+ * This DAG describes the relationships between
+ * (1) an RDD and its dependencies,
+ * (2) an RDD and its operation scopes, and
+ * (3) an RDD's operation scopes and the stage / job hierarchy
+ *
+ * An operation scope is a general, named code block representing an operation
+ * that instantiates RDDs (e.g. filter, textFile, reduceByKey). An operation
+ * scope can be nested inside of other scopes if the corresponding RDD operation
+ * invokes other such operations (for more detail, see o.a.s.rdd.operationScope).
+ *
+ * A stage may include one or more operation scopes if the RDD operations are
+ * streamlined into one stage (e.g. rdd.map(...).filter(...).flatMap(...)).
+ * On the flip side, an operation scope may also include one or many stages,
+ * or even jobs if the RDD operation is higher level than Spark's scheduling
+ * primitives (e.g. take, any SQL query).
+ *
+ * In the visualization, an RDD is expressed as a node, and its dependencies
+ * as directed edges (from parent to child). operation scopes, stages, and
+ * jobs are expressed as clusters that may contain one or many nodes. These
+ * clusters may be nested inside of each other in the scenarios described
+ * above.
+ *
+ * The visualization is rendered in an SVG contained in "div#dag-viz-graph",
+ * and its input data is expected to be populated in "div#dag-viz-metadata"
+ * by Spark's UI code. This is currently used only on the stage page and on
+ * the job page.
+ *
+ * This requires jQuery, d3, and dagre-d3. Note that we use a custom release
+ * of dagre-d3 (http://github.com/andrewor14/dagre-d3) for some specific
+ * functionality. For more detail, please track the changes in that project
+ * since it was forked (commit 101503833a8ce5fe369547f6addf3e71172ce10b).
+ */
+
+var VizConstants = {
+ rddColor: "#444444",
+ rddCachedColor: "#FF0000",
+ rddOperationColor: "#AADFFF",
+ stageColor: "#FFDDEE",
+ clusterLabelColor: "#888888",
+ edgeColor: "#444444",
+ edgeWidth: "1.5px",
+ svgMarginX: 0,
+ svgMarginY: 20,
+ stageSep: 50,
+ graphPrefix: "graph_",
+ nodePrefix: "node_",
+ stagePrefix: "stage_",
+ clusterPrefix: "cluster_",
+ stageClusterPrefix: "cluster_stage_"
+};
+
+// Helper d3 accessors for the elements that contain our graph and its metadata
+function graphContainer() { return d3.select("#dag-viz-graph"); }
+function metadataContainer() { return d3.select("#dag-viz-metadata"); }
+
+/*
+ * Show or hide the RDD DAG visualization.
+ * The graph is only rendered the first time this is called.
+ */
+function toggleDagViz(forJob) {
+ var arrowSelector = ".expand-dag-viz-arrow";
+ $(arrowSelector).toggleClass('arrow-closed');
+ $(arrowSelector).toggleClass('arrow-open');
+ var shouldShow = $(arrowSelector).hasClass("arrow-open");
+ if (shouldShow) {
+ var shouldRender = graphContainer().select("svg").empty();
+ if (shouldRender) {
+ renderDagViz(forJob);
+ }
+ graphContainer().style("display", "block");
+ } else {
+ // Save the graph for later so we don't have to render it again
+ graphContainer().style("display", "none");
+ }
+}
+
+/*
+ * Render the RDD DAG visualization.
+ *
+ * Input DOM hierarchy:
+ * div#dag-viz-metadata >
+ * div.stage-metadata >
+ * div.[dot-file | incoming-edge | outgoing-edge]
+ *
+ * Output DOM hierarchy:
+ * div#dag-viz-graph >
+ * svg >
+ * g#cluster_stage_[stageId]
+ *
+ * Note that the input metadata is populated by o.a.s.ui.UIUtils.showDagViz.
+ * Any changes in the input format here must be reflected there.
+ */
+function renderDagViz(forJob) {
+
+ // If there is not a dot file to render, fail fast and report error
+ if (metadataContainer().empty()) {
+ graphContainer().append("div").text(
+ "No visualization information available for this " + (forJob ? "job" : "stage"));
+ return;
+ }
+
+ var svg = graphContainer().append("svg");
+ if (forJob) {
+ renderDagVizForJob(svg);
+ } else {
+ renderDagVizForStage(svg);
+ }
+
+ // Find cached RDDs
+ metadataContainer().selectAll(".cached-rdd").each(function(v) {
+ var nodeId = VizConstants.nodePrefix + d3.select(this).text();
+ graphContainer().selectAll("#" + nodeId).classed("cached", true);
+ });
+
+ // Set the appropriate SVG dimensions to ensure that all elements are displayed
+ var boundingBox = svg.node().getBBox();
+ svg.style("width", (boundingBox.width + VizConstants.svgMarginX) + "px");
+ svg.style("height", (boundingBox.height + VizConstants.svgMarginY) + "px");
+
+ // Add labels to clusters because dagre-d3 doesn't do this for us
+ svg.selectAll("g.cluster rect").each(function() {
+ var rect = d3.select(this);
+ var cluster = d3.select(this.parentNode);
+ // Shift the boxes up a little to make room for the labels
+ rect.attr("y", toFloat(rect.attr("y")) - 10);
+ rect.attr("height", toFloat(rect.attr("height")) + 10);
+ var labelX = toFloat(rect.attr("x")) + toFloat(rect.attr("width")) - 5;
+ var labelY = toFloat(rect.attr("y")) + 15;
+ var labelText = cluster.attr("name").replace(VizConstants.clusterPrefix, "");
+ cluster.append("text")
+ .attr("x", labelX)
+ .attr("y", labelY)
+ .attr("text-anchor", "end")
+ .text(labelText);
+ });
+
+ // We have shifted a few elements upwards, so we should fix the SVG views
+ var startX = -VizConstants.svgMarginX;
+ var startY = -VizConstants.svgMarginY;
+ var endX = toFloat(svg.style("width")) + VizConstants.svgMarginX;
+ var endY = toFloat(svg.style("height")) + VizConstants.svgMarginY;
+ var newViewBox = startX + " " + startY + " " + endX + " " + endY;
+ svg.attr("viewBox", newViewBox);
+
+ // Lastly, apply some custom style to the DAG
+ styleDagViz(forJob);
+}
+
+/* Render the RDD DAG visualization for a stage. */
+function renderDagVizForStage(svgContainer) {
+ var metadata = metadataContainer().select(".stage-metadata");
+ var dot = metadata.select(".dot-file").text();
+ var containerId = VizConstants.graphPrefix + metadata.attr("stageId");
+ var container = svgContainer.append("g").attr("id", containerId);
+ renderDot(dot, container);
+}
+
+/*
+ * Render the RDD DAG visualization for a job.
+ *
+ * Due to limitations in dagre-d3, each stage is rendered independently so that
+ * we have more control on how to position them. Unfortunately, this means we
+ * cannot rely on dagre-d3 to render edges that cross stages and must render
+ * these manually on our own.
+ */
+function renderDagVizForJob(svgContainer) {
+ var crossStageEdges = [];
+
+ metadataContainer().selectAll(".stage-metadata").each(function(d, i) {
+ var metadata = d3.select(this);
+ var dot = metadata.select(".dot-file").text();
+ var stageId = metadata.attr("stageId");
+ var containerId = VizConstants.graphPrefix + stageId;
+ // TODO: handle stage attempts
+ var stageLink =
+ "/stages/stage/?id=" + stageId.replace(VizConstants.stagePrefix, "") + "&attempt=0";
+ var container = svgContainer
+ .append("a").attr("xlink:href", stageLink)
+ .append("g").attr("id", containerId);
+ // Now we need to shift the container for this stage so it doesn't overlap
+ // with existing ones. We do not need to do this for the first stage.
+ if (i > 0) {
+ // Take into account the position and width of the last stage's container
+ var existingStages = stageClusters();
+ if (!existingStages.empty()) {
+ var lastStage = existingStages[0].pop();
+ var lastStageId = d3.select(lastStage).attr("id");
+ var lastStageWidth = toFloat(d3.select("#" + lastStageId + " rect").attr("width"));
+ var lastStagePosition = getAbsolutePosition(lastStageId);
+ var offset = lastStagePosition.x + lastStageWidth + VizConstants.stageSep;
+ container.attr("transform", "translate(" + offset + ", 0)");
+ }
+ }
+ renderDot(dot, container);
+ // If there are any incoming edges into this graph, keep track of them to render
+ // them separately later. Note that we cannot draw them now because we need to
+ // put these edges in a separate container that is on top of all stage graphs.
+ metadata.selectAll(".incoming-edge").each(function(v) {
+ var edge = d3.select(this).text().split(","); // e.g. 3,4 => [3, 4]
+ crossStageEdges.push(edge);
+ });
+ });
+
+ // Draw edges that cross stages
+ if (crossStageEdges.length > 0) {
+ var container = svgContainer.append("g").attr("id", "cross-stage-edges");
+ for (var i = 0; i < crossStageEdges.length; i++) {
+ var fromRDDId = crossStageEdges[i][0];
+ var toRDDId = crossStageEdges[i][1];
+ connectRDDs(fromRDDId, toRDDId, container);
+ }
+ }
+}
+
+/* Render the dot file as an SVG in the given container. */
+function renderDot(dot, container) {
+ var escaped_dot = dot
+ .replace(/</g, "<")
+ .replace(/>/g, ">")
+ .replace(/"/g, "\"");
+ var g = graphlibDot.read(escaped_dot);
+ var renderer = new dagreD3.render();
+ renderer(container, g);
+}
+
+/* Style the visualization we just rendered. */
+function styleDagViz(forJob) {
+ graphContainer().selectAll("svg g.cluster rect")
+ .style("fill", "white")
+ .style("stroke", VizConstants.rddOperationColor)
+ .style("stroke-width", "4px")
+ .style("stroke-opacity", "0.5");
+ graphContainer().selectAll("svg g.cluster text")
+ .attr("fill", VizConstants.clusterLabelColor)
+ .attr("font-size", "11px");
+ graphContainer().selectAll("svg path")
+ .style("stroke", VizConstants.edgeColor)
+ .style("stroke-width", VizConstants.edgeWidth);
+ stageClusters()
+ .select("rect")
+ .style("stroke", VizConstants.stageColor)
+ .style("strokeWidth", "6px");
+
+ // Put an arrow at the end of every edge
+ // We need to do this because we manually render some edges ourselves
+ // For these edges, we borrow the arrow marker generated by dagre-d3
+ var dagreD3Marker = graphContainer().select("svg g.edgePaths marker").node();
+ graphContainer().select("svg")
+ .append(function() { return dagreD3Marker.cloneNode(true); })
+ .attr("id", "marker-arrow")
+ .select("path")
+ .attr("fill", VizConstants.edgeColor)
+ .attr("strokeWidth", "0px");
+ graphContainer().selectAll("svg g > path").attr("marker-end", "url(#marker-arrow)");
+ graphContainer().selectAll("svg g.edgePaths def").remove(); // We no longer need these
+
+ // Apply any job or stage specific styles
+ if (forJob) {
+ styleDagVizForJob();
+ } else {
+ styleDagVizForStage();
+ }
+}
+
+/* Apply job-page-specific style to the visualization. */
+function styleDagVizForJob() {
+ graphContainer().selectAll("svg g.node circle")
+ .style("fill", VizConstants.rddColor);
+ // TODO: add a legend to explain what a highlighted dot means
+ graphContainer().selectAll("svg g.cached circle")
+ .style("fill", VizConstants.rddCachedColor);
+ graphContainer().selectAll("svg g#cross-stage-edges path")
+ .style("fill", "none");
+}
+
+/* Apply stage-page-specific style to the visualization. */
+function styleDagVizForStage() {
+ graphContainer().selectAll("svg g.node rect")
+ .style("fill", "none")
+ .style("stroke", VizConstants.rddColor)
+ .style("stroke-width", "2px")
+ .attr("rx", "5") // round corners
+ .attr("ry", "5");
+ // TODO: add a legend to explain what a highlighted RDD means
+ graphContainer().selectAll("svg g.cached rect")
+ .style("stroke", VizConstants.rddCachedColor);
+ graphContainer().selectAll("svg g.node g.label text tspan")
+ .style("fill", VizConstants.rddColor);
+}
+
+/*
+ * (Job page only) Helper method to compute the absolute
+ * position of the group element identified by the given ID.
+ */
+function getAbsolutePosition(groupId) {
+ var obj = d3.select("#" + groupId).filter("g");
+ var _x = 0, _y = 0;
+ while (!obj.empty()) {
+ var transformText = obj.attr("transform");
+ var translate = d3.transform(transformText).translate
+ _x += translate[0];
+ _y += translate[1];
+ obj = d3.select(obj.node().parentNode).filter("g")
+ }
+ return { x: _x, y: _y };
+}
+
+/* (Job page only) Connect two RDD nodes with a curved edge. */
+function connectRDDs(fromRDDId, toRDDId, container) {
+ var fromNodeId = VizConstants.nodePrefix + fromRDDId;
+ var toNodeId = VizConstants.nodePrefix + toRDDId
+ var fromPos = getAbsolutePosition(fromNodeId);
+ var toPos = getAbsolutePosition(toNodeId);
+
+ // On the job page, RDDs are rendered as dots (circles). When rendering the path,
+ // we need to account for the radii of these circles. Otherwise the arrow heads
+ // will bleed into the circle itself.
+ var delta = toFloat(graphContainer()
+ .select("g.node#" + toNodeId)
+ .select("circle")
+ .attr("r"));
+ if (fromPos.x < toPos.x) {
+ fromPos.x += delta;
+ toPos.x -= delta;
+ } else if (fromPos.x > toPos.x) {
+ fromPos.x -= delta;
+ toPos.x += delta;
+ }
+
+ if (fromPos.y == toPos.y) {
+ // If they are on the same rank, curve the middle part of the edge
+ // upward a little to avoid interference with things in between
+ // e.g. _______
+ // _____/ \_____
+ var points = [
+ [fromPos.x, fromPos.y],
+ [fromPos.x + (toPos.x - fromPos.x) * 0.2, fromPos.y],
+ [fromPos.x + (toPos.x - fromPos.x) * 0.3, fromPos.y - 20],
+ [fromPos.x + (toPos.x - fromPos.x) * 0.7, fromPos.y - 20],
+ [fromPos.x + (toPos.x - fromPos.x) * 0.8, toPos.y],
+ [toPos.x, toPos.y]
+ ];
+ } else {
+ // Otherwise, draw a curved edge that flattens out on both ends
+ // e.g. _____
+ // /
+ // |
+ // _____/
+ var points = [
+ [fromPos.x, fromPos.y],
+ [fromPos.x + (toPos.x - fromPos.x) * 0.4, fromPos.y],
+ [fromPos.x + (toPos.x - fromPos.x) * 0.6, toPos.y],
+ [toPos.x, toPos.y]
+ ];
+ }
+
+ var line = d3.svg.line().interpolate("basis");
+ container.append("path").datum(points).attr("d", line);
+}
+
+/* Helper d3 accessor to clusters that represent stages. */
+function stageClusters() {
+ return graphContainer().selectAll("g.cluster").filter(function() {
+ return d3.select(this).attr("id").indexOf(VizConstants.stageClusterPrefix) > -1;
+ });
+}
+
+/* Helper method to convert attributes to numeric values. */
+function toFloat(f) {
+ return parseFloat(f.replace(/px$/, ""));
+}
+
http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/resources/org/apache/spark/ui/static/webui.css
----------------------------------------------------------------------
diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.css b/core/src/main/resources/org/apache/spark/ui/static/webui.css
index 4910744..669ad48 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/webui.css
+++ b/core/src/main/resources/org/apache/spark/ui/static/webui.css
@@ -145,7 +145,7 @@ pre {
border: none;
}
-span.expand-additional-metrics {
+span.expand-additional-metrics, span.expand-dag-viz {
cursor: pointer;
}
http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 4ef9054..b98a54b 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -659,6 +659,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, null)
}
+ /**
+ * Execute a block of code in a scope such that all new RDDs created in this body will
+ * be part of the same scope. For more detail, see {{org.apache.spark.rdd.RDDOperationScope}}.
+ *
+ * Note: Return statements are NOT allowed in the given body.
+ */
+ private def withScope[U](body: => U): U = RDDOperationScope.withScope[U](this)(body)
+
// Methods for creating RDDs
/** Distribute a local Scala collection to form an RDD.
@@ -669,7 +677,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @note avoid using `parallelize(Seq())` to create an empty `RDD`. Consider `emptyRDD` for an
* RDD with no partitions, or `parallelize(Seq[T]())` for an RDD of `T` with empty partitions.
*/
- def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
+ def parallelize[T: ClassTag](
+ seq: Seq[T],
+ numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
@@ -678,14 +688,16 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*
* This method is identical to `parallelize`.
*/
- def makeRDD[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
+ def makeRDD[T: ClassTag](
+ seq: Seq[T],
+ numSlices: Int = defaultParallelism): RDD[T] = withScope {
parallelize(seq, numSlices)
}
/** Distribute a local Scala collection to form an RDD, with one or more
* location preferences (hostnames of Spark nodes) for each object.
* Create a new partition for each collection item. */
- def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = {
+ def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope {
assertNotStopped()
val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap
new ParallelCollectionRDD[T](this, seq.map(_._1), seq.size, indexToPrefs)
@@ -695,10 +707,12 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
*/
- def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
+ def textFile(
+ path: String,
+ minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
- minPartitions).map(pair => pair._2.toString).setName(path)
+ minPartitions).map(pair => pair._2.toString)
}
/**
@@ -728,8 +742,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*
* @param minPartitions A suggestion value of the minimal splitting number for input data.
*/
- def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions):
- RDD[(String, String)] = {
+ def wholeTextFiles(
+ path: String,
+ minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = withScope {
assertNotStopped()
val job = new NewHadoopJob(hadoopConfiguration)
// Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking
@@ -776,8 +791,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @note Small files are preferred; very large files may cause bad performance.
*/
@Experimental
- def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions):
- RDD[(String, PortableDataStream)] = {
+ def binaryFiles(
+ path: String,
+ minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)] = withScope {
assertNotStopped()
val job = new NewHadoopJob(hadoopConfiguration)
// Use setInputPaths so that binaryFiles aligns with hadoopFile/textFile in taking
@@ -806,8 +822,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @return An RDD of data with values, represented as byte arrays
*/
@Experimental
- def binaryRecords(path: String, recordLength: Int, conf: Configuration = hadoopConfiguration)
- : RDD[Array[Byte]] = {
+ def binaryRecords(
+ path: String,
+ recordLength: Int,
+ conf: Configuration = hadoopConfiguration): RDD[Array[Byte]] = withScope {
assertNotStopped()
conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength)
val br = newAPIHadoopFile[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](path,
@@ -848,8 +866,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
- minPartitions: Int = defaultMinPartitions
- ): RDD[(K, V)] = {
+ minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
assertNotStopped()
// Add necessary security credentials to the JobConf before broadcasting it.
SparkHadoopUtil.get.addCredentials(conf)
@@ -869,8 +886,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
- minPartitions: Int = defaultMinPartitions
- ): RDD[(K, V)] = {
+ minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
assertNotStopped()
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
@@ -901,7 +917,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*/
def hadoopFile[K, V, F <: InputFormat[K, V]]
(path: String, minPartitions: Int)
- (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
+ (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = withScope {
hadoopFile(path,
fm.runtimeClass.asInstanceOf[Class[F]],
km.runtimeClass.asInstanceOf[Class[K]],
@@ -924,13 +940,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* copy them using a `map` function.
*/
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
- (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] =
+ (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = withScope {
hadoopFile[K, V, F](path, defaultMinPartitions)
+ }
/** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]
(path: String)
- (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
+ (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = withScope {
newAPIHadoopFile(
path,
fm.runtimeClass.asInstanceOf[Class[F]],
@@ -953,7 +970,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
fClass: Class[F],
kClass: Class[K],
vClass: Class[V],
- conf: Configuration = hadoopConfiguration): RDD[(K, V)] = {
+ conf: Configuration = hadoopConfiguration): RDD[(K, V)] = withScope {
assertNotStopped()
// The call to new NewHadoopJob automatically adds security credentials to conf,
// so we don't need to explicitly add them ourselves
@@ -987,7 +1004,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
conf: Configuration = hadoopConfiguration,
fClass: Class[F],
kClass: Class[K],
- vClass: Class[V]): RDD[(K, V)] = {
+ vClass: Class[V]): RDD[(K, V)] = withScope {
assertNotStopped()
// Add necessary security credentials to the JobConf. Required to access secure HDFS.
val jconf = new JobConf(conf)
@@ -1007,7 +1024,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int
- ): RDD[(K, V)] = {
+ ): RDD[(K, V)] = withScope {
assertNotStopped()
val inputFormatClass = classOf[SequenceFileInputFormat[K, V]]
hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions)
@@ -1021,7 +1038,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
* */
- def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]): RDD[(K, V)] = {
+ def sequenceFile[K, V](
+ path: String,
+ keyClass: Class[K],
+ valueClass: Class[V]): RDD[(K, V)] = withScope {
assertNotStopped()
sequenceFile(path, keyClass, valueClass, defaultMinPartitions)
}
@@ -1051,16 +1071,17 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
def sequenceFile[K, V]
(path: String, minPartitions: Int = defaultMinPartitions)
(implicit km: ClassTag[K], vm: ClassTag[V],
- kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
- : RDD[(K, V)] = {
- assertNotStopped()
- val kc = kcf()
- val vc = vcf()
- val format = classOf[SequenceFileInputFormat[Writable, Writable]]
- val writables = hadoopFile(path, format,
+ kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]): RDD[(K, V)] = {
+ withScope {
+ assertNotStopped()
+ val kc = kcf()
+ val vc = vcf()
+ val format = classOf[SequenceFileInputFormat[Writable, Writable]]
+ val writables = hadoopFile(path, format,
kc.writableClass(km).asInstanceOf[Class[Writable]],
vc.writableClass(vm).asInstanceOf[Class[Writable]], minPartitions)
- writables.map { case (k, v) => (kc.convert(k), vc.convert(v)) }
+ writables.map { case (k, v) => (kc.convert(k), vc.convert(v)) }
+ }
}
/**
@@ -1073,21 +1094,18 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*/
def objectFile[T: ClassTag](
path: String,
- minPartitions: Int = defaultMinPartitions
- ): RDD[T] = {
+ minPartitions: Int = defaultMinPartitions): RDD[T] = withScope {
assertNotStopped()
sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minPartitions)
.flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes, Utils.getContextOrSparkClassLoader))
}
- protected[spark] def checkpointFile[T: ClassTag](
- path: String
- ): RDD[T] = {
+ protected[spark] def checkpointFile[T: ClassTag](path: String): RDD[T] = withScope {
new CheckpointRDD[T](this, path)
}
/** Build the union of a list of RDDs. */
- def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = {
+ def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = withScope {
val partitioners = rdds.flatMap(_.partitioner).toSet
if (rdds.forall(_.partitioner.isDefined) && partitioners.size == 1) {
new PartitionerAwareUnionRDD(this, rdds)
@@ -1097,8 +1115,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
/** Build the union of a list of RDDs passed as variable-length arguments. */
- def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] =
+ def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] = withScope {
union(Seq(first) ++ rest)
+ }
/** Get an RDD that has no partitions or elements. */
def emptyRDD[T: ClassTag]: EmptyRDD[T] = new EmptyRDD[T](this)
@@ -2060,10 +2079,10 @@ object SparkContext extends Logging {
}
private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description"
-
private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id"
-
private[spark] val SPARK_JOB_INTERRUPT_ON_CANCEL = "spark.job.interruptOnCancel"
+ private[spark] val RDD_SCOPE_KEY = "spark.rdd.scope"
+ private[spark] val RDD_SCOPE_NO_OVERRIDE_KEY = "spark.rdd.scope.noOverride"
/**
* Executor id for the driver. In earlier versions of Spark, this was `<driver>`, but this was
http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
index 3406a7e..ec18534 100644
--- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
@@ -33,7 +33,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
/**
* Returns a future for counting the number of elements in the RDD.
*/
- def countAsync(): FutureAction[Long] = {
+ def countAsync(): FutureAction[Long] = self.withScope {
val totalCount = new AtomicLong
self.context.submitJob(
self,
@@ -53,7 +53,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
/**
* Returns a future for retrieving all elements of this RDD.
*/
- def collectAsync(): FutureAction[Seq[T]] = {
+ def collectAsync(): FutureAction[Seq[T]] = self.withScope {
val results = new Array[Array[T]](self.partitions.length)
self.context.submitJob[T, Array[T], Seq[T]](self, _.toArray, Range(0, self.partitions.length),
(index, data) => results(index) = data, results.flatten.toSeq)
@@ -62,7 +62,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
/**
* Returns a future for retrieving the first num elements of the RDD.
*/
- def takeAsync(num: Int): FutureAction[Seq[T]] = {
+ def takeAsync(num: Int): FutureAction[Seq[T]] = self.withScope {
val f = new ComplexFutureAction[Seq[T]]
f.run {
@@ -109,7 +109,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
/**
* Applies a function f to all elements of this RDD.
*/
- def foreachAsync(f: T => Unit): FutureAction[Unit] = {
+ def foreachAsync(f: T => Unit): FutureAction[Unit] = self.withScope {
val cleanF = self.context.clean(f)
self.context.submitJob[T, Unit, Unit](self, _.foreach(cleanF), Range(0, self.partitions.length),
(index, data) => Unit, Unit)
@@ -118,7 +118,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
/**
* Applies a function f to each partition of this RDD.
*/
- def foreachPartitionAsync(f: Iterator[T] => Unit): FutureAction[Unit] = {
+ def foreachPartitionAsync(f: Iterator[T] => Unit): FutureAction[Unit] = self.withScope {
self.context.submitJob[T, Unit, Unit](self, f, Range(0, self.partitions.length),
(index, data) => Unit, Unit)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
index 843a893..926bce6 100644
--- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
@@ -30,7 +30,7 @@ import org.apache.spark.util.StatCounter
*/
class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
/** Add up the elements in this RDD. */
- def sum(): Double = {
+ def sum(): Double = self.withScope {
self.fold(0.0)(_ + _)
}
@@ -38,37 +38,49 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
* Return a [[org.apache.spark.util.StatCounter]] object that captures the mean, variance and
* count of the RDD's elements in one operation.
*/
- def stats(): StatCounter = {
+ def stats(): StatCounter = self.withScope {
self.mapPartitions(nums => Iterator(StatCounter(nums))).reduce((a, b) => a.merge(b))
}
/** Compute the mean of this RDD's elements. */
- def mean(): Double = stats().mean
+ def mean(): Double = self.withScope {
+ stats().mean
+ }
/** Compute the variance of this RDD's elements. */
- def variance(): Double = stats().variance
+ def variance(): Double = self.withScope {
+ stats().variance
+ }
/** Compute the standard deviation of this RDD's elements. */
- def stdev(): Double = stats().stdev
+ def stdev(): Double = self.withScope {
+ stats().stdev
+ }
/**
* Compute the sample standard deviation of this RDD's elements (which corrects for bias in
* estimating the standard deviation by dividing by N-1 instead of N).
*/
- def sampleStdev(): Double = stats().sampleStdev
+ def sampleStdev(): Double = self.withScope {
+ stats().sampleStdev
+ }
/**
* Compute the sample variance of this RDD's elements (which corrects for bias in
* estimating the variance by dividing by N-1 instead of N).
*/
- def sampleVariance(): Double = stats().sampleVariance
+ def sampleVariance(): Double = self.withScope {
+ stats().sampleVariance
+ }
/**
* :: Experimental ::
* Approximate operation to return the mean within a timeout.
*/
@Experimental
- def meanApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
+ def meanApprox(
+ timeout: Long,
+ confidence: Double = 0.95): PartialResult[BoundedDouble] = self.withScope {
val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns)
val evaluator = new MeanEvaluator(self.partitions.length, confidence)
self.context.runApproximateJob(self, processPartition, evaluator, timeout)
@@ -79,7 +91,9 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
* Approximate operation to return the sum within a timeout.
*/
@Experimental
- def sumApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
+ def sumApprox(
+ timeout: Long,
+ confidence: Double = 0.95): PartialResult[BoundedDouble] = self.withScope {
val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns)
val evaluator = new SumEvaluator(self.partitions.length, confidence)
self.context.runApproximateJob(self, processPartition, evaluator, timeout)
@@ -93,7 +107,7 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
* If the RDD contains infinity, NaN throws an exception
* If the elements in RDD do not vary (max == min) always returns a single bucket.
*/
- def histogram(bucketCount: Int): Pair[Array[Double], Array[Long]] = {
+ def histogram(bucketCount: Int): Pair[Array[Double], Array[Long]] = self.withScope {
// Scala's built-in range has issues. See #SI-8782
def customRange(min: Double, max: Double, steps: Int): IndexedSeq[Double] = {
val span = max - min
@@ -140,7 +154,9 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
* the maximum value of the last position and all NaN entries will be counted
* in that bucket.
*/
- def histogram(buckets: Array[Double], evenBuckets: Boolean = false): Array[Long] = {
+ def histogram(
+ buckets: Array[Double],
+ evenBuckets: Boolean = false): Array[Long] = self.withScope {
if (buckets.length < 2) {
throw new IllegalArgumentException("buckets array must have at least two elements")
}
http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index f77abac..2cefe63 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -99,7 +99,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
*/
@DeveloperApi
class HadoopRDD[K, V](
- sc: SparkContext,
+ @transient sc: SparkContext,
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
initLocalJobConfFuncOpt: Option[JobConf => Unit],
inputFormatClass: Class[_ <: InputFormat[K, V]],
@@ -108,6 +108,10 @@ class HadoopRDD[K, V](
minPartitions: Int)
extends RDD[(K, V)](sc, Nil) with Logging {
+ if (initLocalJobConfFuncOpt.isDefined) {
+ sc.clean(initLocalJobConfFuncOpt.get)
+ }
+
def this(
sc: SparkContext,
conf: JobConf,
http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
index 6afe501..d71bb63 100644
--- a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
@@ -57,7 +57,7 @@ class OrderedRDDFunctions[K : Ordering : ClassTag,
*/
// TODO: this currently doesn't work on P other than Tuple2!
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
- : RDD[(K, V)] =
+ : RDD[(K, V)] = self.withScope
{
val part = new RangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K, V, V](self, part)
@@ -71,7 +71,7 @@ class OrderedRDDFunctions[K : Ordering : ClassTag,
* This is more efficient than calling `repartition` and then sorting within each partition
* because it can push the sorting down into the shuffle machinery.
*/
- def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] = {
+ def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering)
}
@@ -81,7 +81,7 @@ class OrderedRDDFunctions[K : Ordering : ClassTag,
* performed efficiently by only scanning the partitions that might contain matching elements.
* Otherwise, a standard `filter` is applied to all partitions.
*/
- def filterByRange(lower: K, upper: K): RDD[P] = {
+ def filterByRange(lower: K, upper: K): RDD[P] = self.withScope {
def inRange(k: K): Boolean = ordering.gteq(k, lower) && ordering.lteq(k, upper)
http://git-wip-us.apache.org/repos/asf/spark/blob/fc8b5819/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 05351ba..93d338f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -29,7 +29,7 @@ import scala.util.DynamicVariable
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
import org.apache.hadoop.conf.{Configurable, Configuration}
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat}
@@ -75,7 +75,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
- serializer: Serializer = null): RDD[(K, C)] = {
+ serializer: Serializer = null): RDD[(K, C)] = self.withScope {
require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
if (keyClass.isArray) {
if (mapSideCombine) {
@@ -108,7 +108,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
def combineByKey[C](createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
- numPartitions: Int): RDD[(K, C)] = {
+ numPartitions: Int): RDD[(K, C)] = self.withScope {
combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions))
}
@@ -122,7 +122,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* instead of creating a new U.
*/
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
- combOp: (U, U) => U): RDD[(K, U)] = {
+ combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
// Serialize the zero value to a byte array so that we can get a new clone of it on each key
val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
val zeroArray = new Array[Byte](zeroBuffer.limit)
@@ -144,7 +144,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* instead of creating a new U.
*/
def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U,
- combOp: (U, U) => U): RDD[(K, U)] = {
+ combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
aggregateByKey(zeroValue, new HashPartitioner(numPartitions))(seqOp, combOp)
}
@@ -158,7 +158,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* instead of creating a new U.
*/
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
- combOp: (U, U) => U): RDD[(K, U)] = {
+ combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp)
}
@@ -167,7 +167,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* may be added to the result an arbitrary number of times, and must not change the result
* (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
*/
- def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = {
+ def foldByKey(
+ zeroValue: V,
+ partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
// Serialize the zero value to a byte array so that we can get a new clone of it on each key
val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
val zeroArray = new Array[Byte](zeroBuffer.limit)
@@ -185,7 +187,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* may be added to the result an arbitrary number of times, and must not change the result
* (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
*/
- def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] = {
+ def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
foldByKey(zeroValue, new HashPartitioner(numPartitions))(func)
}
@@ -194,7 +196,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* may be added to the result an arbitrary number of times, and must not change the result
* (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
*/
- def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = {
+ def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
foldByKey(zeroValue, defaultPartitioner(self))(func)
}
@@ -213,7 +215,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
*/
def sampleByKey(withReplacement: Boolean,
fractions: Map[K, Double],
- seed: Long = Utils.random.nextLong): RDD[(K, V)] = {
+ seed: Long = Utils.random.nextLong): RDD[(K, V)] = self.withScope {
require(fractions.values.forall(v => v >= 0.0), "Negative sampling rates.")
@@ -242,9 +244,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* @return RDD containing the sampled subset
*/
@Experimental
- def sampleByKeyExact(withReplacement: Boolean,
+ def sampleByKeyExact(
+ withReplacement: Boolean,
fractions: Map[K, Double],
- seed: Long = Utils.random.nextLong): RDD[(K, V)] = {
+ seed: Long = Utils.random.nextLong): RDD[(K, V)] = self.withScope {
require(fractions.values.forall(v => v >= 0.0), "Negative sampling rates.")
@@ -261,7 +264,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* the merging locally on each mapper before sending results to a reducer, similarly to a
* "combiner" in MapReduce.
*/
- def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
+ def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
combineByKey[V]((v: V) => v, func, func, partitioner)
}
@@ -270,7 +273,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* the merging locally on each mapper before sending results to a reducer, similarly to a
* "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
*/
- def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = {
+ def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {
reduceByKey(new HashPartitioner(numPartitions), func)
}
@@ -280,7 +283,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
* parallelism level.
*/
- def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {
+ def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
reduceByKey(defaultPartitioner(self), func)
}
@@ -289,7 +292,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* immediately to the master as a Map. This will also perform the merging locally on each mapper
* before sending results to a reducer, similarly to a "combiner" in MapReduce.
*/
- def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = {
+ def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = self.withScope {
if (keyClass.isArray) {
throw new SparkException("reduceByKeyLocally() does not support array keys")
@@ -317,7 +320,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
/** Alias for reduceByKeyLocally */
@deprecated("Use reduceByKeyLocally", "1.0.0")
- def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = reduceByKeyLocally(func)
+ def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = self.withScope {
+ reduceByKeyLocally(func)
+ }
/**
* Count the number of elements for each key, collecting the results to a local Map.
@@ -327,7 +332,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* To handle very large results, consider using rdd.mapValues(_ => 1L).reduceByKey(_ + _), which
* returns an RDD[T, Long] instead of a map.
*/
- def countByKey(): Map[K, Long] = self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap
+ def countByKey(): Map[K, Long] = self.withScope {
+ self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap
+ }
/**
* :: Experimental ::
@@ -336,7 +343,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
*/
@Experimental
def countByKeyApprox(timeout: Long, confidence: Double = 0.95)
- : PartialResult[Map[K, BoundedDouble]] = {
+ : PartialResult[Map[K, BoundedDouble]] = self.withScope {
self.map(_._1).countByValueApprox(timeout, confidence)
}
@@ -360,7 +367,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* @param partitioner Partitioner to use for the resulting RDD.
*/
@Experimental
- def countApproxDistinctByKey(p: Int, sp: Int, partitioner: Partitioner): RDD[(K, Long)] = {
+ def countApproxDistinctByKey(
+ p: Int,
+ sp: Int,
+ partitioner: Partitioner): RDD[(K, Long)] = self.withScope {
require(p >= 4, s"p ($p) must be >= 4")
require(sp <= 32, s"sp ($sp) must be <= 32")
require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)")
@@ -392,7 +402,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* It must be greater than 0.000017.
* @param partitioner partitioner of the resulting RDD
*/
- def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = {
+ def countApproxDistinctByKey(
+ relativeSD: Double,
+ partitioner: Partitioner): RDD[(K, Long)] = self.withScope {
require(relativeSD > 0.000017, s"accuracy ($relativeSD) must be greater than 0.000017")
val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt
assert(p <= 32)
@@ -410,7 +422,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* It must be greater than 0.000017.
* @param numPartitions number of partitions of the resulting RDD
*/
- def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = {
+ def countApproxDistinctByKey(
+ relativeSD: Double,
+ numPartitions: Int): RDD[(K, Long)] = self.withScope {
countApproxDistinctByKey(relativeSD, new HashPartitioner(numPartitions))
}
@@ -424,7 +438,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
* It must be greater than 0.000017.
*/
- def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = {
+ def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = self.withScope {
countApproxDistinctByKey(relativeSD, defaultPartitioner(self))
}
@@ -441,7 +455,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any
* key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].
*/
- def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = {
+ def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
// groupByKey shouldn't use map side combine because map side combine does not
// reduce the amount of data shuffled and requires all map side data be inserted
// into a hash table, leading to more objects in the old gen.
@@ -465,14 +479,14 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any
* key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].
*/
- def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = {
+ def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = self.withScope {
groupByKey(new HashPartitioner(numPartitions))
}
/**
* Return a copy of the RDD partitioned using the specified partitioner.
*/
- def partitionBy(partitioner: Partitioner): RDD[(K, V)] = {
+ def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
@@ -488,7 +502,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
* (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
*/
- def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
+ def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
this.cogroup(other, partitioner).flatMapValues( pair =>
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
)
@@ -500,7 +514,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to
* partition the output RDD.
*/
- def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
+ def leftOuterJoin[W](
+ other: RDD[(K, W)],
+ partitioner: Partitioner): RDD[(K, (V, Option[W]))] = self.withScope {
this.cogroup(other, partitioner).flatMapValues { pair =>
if (pair._2.isEmpty) {
pair._1.iterator.map(v => (v, None))
@@ -517,7 +533,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* partition the output RDD.
*/
def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
- : RDD[(K, (Option[V], W))] = {
+ : RDD[(K, (Option[V], W))] = self.withScope {
this.cogroup(other, partitioner).flatMapValues { pair =>
if (pair._1.isEmpty) {
pair._2.iterator.map(w => (None, w))
@@ -536,7 +552,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* in `this` have key k. Uses the given Partitioner to partition the output RDD.
*/
def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
- : RDD[(K, (Option[V], Option[W]))] = {
+ : RDD[(K, (Option[V], Option[W]))] = self.withScope {
this.cogroup(other, partitioner).flatMapValues {
case (vs, Seq()) => vs.iterator.map(v => (Some(v), None))
case (Seq(), ws) => ws.iterator.map(w => (None, Some(w)))
@@ -549,7 +565,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* existing partitioner/parallelism level.
*/
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
- : RDD[(K, C)] = {
+ : RDD[(K, C)] = self.withScope {
combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self))
}
@@ -563,7 +579,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
*/
- def groupByKey(): RDD[(K, Iterable[V])] = {
+ def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {
groupByKey(defaultPartitioner(self))
}
@@ -572,7 +588,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
* (k, v2) is in `other`. Performs a hash join across the cluster.
*/
- def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = {
+ def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = self.withScope {
join(other, defaultPartitioner(self, other))
}
@@ -581,7 +597,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
* (k, v2) is in `other`. Performs a hash join across the cluster.
*/
- def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] = {
+ def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] = self.withScope {
join(other, new HashPartitioner(numPartitions))
}
@@ -591,7 +607,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
* using the existing partitioner/parallelism level.
*/
- def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = {
+ def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = self.withScope {
leftOuterJoin(other, defaultPartitioner(self, other))
}
@@ -601,7 +617,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
* into `numPartitions` partitions.
*/
- def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))] = {
+ def leftOuterJoin[W](
+ other: RDD[(K, W)],
+ numPartitions: Int): RDD[(K, (V, Option[W]))] = self.withScope {
leftOuterJoin(other, new HashPartitioner(numPartitions))
}
@@ -611,7 +629,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
* RDD using the existing partitioner/parallelism level.
*/
- def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = {
+ def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = self.withScope {
rightOuterJoin(other, defaultPartitioner(self, other))
}
@@ -621,7 +639,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
* RDD into the given number of partitions.
*/
- def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))] = {
+ def rightOuterJoin[W](
+ other: RDD[(K, W)],
+ numPartitions: Int): RDD[(K, (Option[V], W))] = self.withScope {
rightOuterJoin(other, new HashPartitioner(numPartitions))
}
@@ -634,7 +654,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* in `this` have key k. Hash-partitions the resulting RDD using the existing partitioner/
* parallelism level.
*/
- def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))] = {
+ def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))] = self.withScope {
fullOuterJoin(other, defaultPartitioner(self, other))
}
@@ -646,7 +666,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
* in `this` have key k. Hash-partitions the resulting RDD into the given number of partitions.
*/
- def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))] = {
+ def fullOuterJoin[W](
+ other: RDD[(K, W)],
+ numPartitions: Int): RDD[(K, (Option[V], Option[W]))] = self.withScope {
fullOuterJoin(other, new HashPartitioner(numPartitions))
}
@@ -656,7 +678,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* Warning: this doesn't return a multimap (so if you have multiple values to the same key, only
* one value per key is preserved in the map returned)
*/
- def collectAsMap(): Map[K, V] = {
+ def collectAsMap(): Map[K, V] = self.withScope {
val data = self.collect()
val map = new mutable.HashMap[K, V]
map.sizeHint(data.length)
@@ -668,7 +690,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* Pass each value in the key-value pair RDD through a map function without changing the keys;
* this also retains the original RDD's partitioning.
*/
- def mapValues[U](f: V => U): RDD[(K, U)] = {
+ def mapValues[U](f: V => U): RDD[(K, U)] = self.withScope {
val cleanF = self.context.clean(f)
new MapPartitionsRDD[(K, U), (K, V)](self,
(context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },
@@ -679,7 +701,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* Pass each value in the key-value pair RDD through a flatMap function without changing the
* keys; this also retains the original RDD's partitioning.
*/
- def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = {
+ def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = self.withScope {
val cleanF = self.context.clean(f)
new MapPartitionsRDD[(K, U), (K, V)](self,
(context, pid, iter) => iter.flatMap { case (k, v) =>
@@ -697,7 +719,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
other2: RDD[(K, W2)],
other3: RDD[(K, W3)],
partitioner: Partitioner)
- : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = {
+ : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {
if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
@@ -715,7 +737,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
- : RDD[(K, (Iterable[V], Iterable[W]))] = {
+ : RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
@@ -730,7 +752,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
- : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {
+ : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope {
if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
@@ -748,7 +770,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* for that key in `this`, `other1`, `other2` and `other3`.
*/
def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)])
- : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = {
+ : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {
cogroup(other1, other2, other3, defaultPartitioner(self, other1, other2, other3))
}
@@ -756,7 +778,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
- def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = {
+ def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
cogroup(other, defaultPartitioner(self, other))
}
@@ -765,7 +787,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
- : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {
+ : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope {
cogroup(other1, other2, defaultPartitioner(self, other1, other2))
}
@@ -773,7 +795,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
- def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] = {
+ def cogroup[W](
+ other: RDD[(K, W)],
+ numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
cogroup(other, new HashPartitioner(numPartitions))
}
@@ -782,7 +806,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int)
- : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {
+ : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope {
cogroup(other1, other2, new HashPartitioner(numPartitions))
}
@@ -795,24 +819,24 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
other2: RDD[(K, W2)],
other3: RDD[(K, W3)],
numPartitions: Int)
- : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = {
+ : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {
cogroup(other1, other2, other3, new HashPartitioner(numPartitions))
}
/** Alias for cogroup. */
- def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = {
+ def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
cogroup(other, defaultPartitioner(self, other))
}
/** Alias for cogroup. */
def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
- : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {
+ : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope {
cogroup(other1, other2, defaultPartitioner(self, other1, other2))
}
/** Alias for cogroup. */
def groupWith[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)])
- : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = {
+ : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {
cogroup(other1, other2, other3, defaultPartitioner(self, other1, other2, other3))
}
@@ -822,22 +846,27 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
* RDD will be <= us.
*/
- def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)] =
+ def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)] = self.withScope {
subtractByKey(other, self.partitioner.getOrElse(new HashPartitioner(self.partitions.length)))
+ }
/** Return an RDD with the pairs from `this` whose keys are not in `other`. */
- def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)] =
+ def subtractByKey[W: ClassTag](
+ other: RDD[(K, W)],
+ numPartitions: Int): RDD[(K, V)] = self.withScope {
subtractByKey(other, new HashPartitioner(numPartitions))
+ }
/** Return an RDD with the pairs from `this` whose keys are not in `other`. */
- def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)] =
+ def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)] = self.withScope {
new SubtractedRDD[K, V, W](self, other, p)
+ }
/**
* Return the list of values in the RDD for key `key`. This operation is done efficiently if the
* RDD has a known partitioner by only searching the partition that the key maps to.
*/
- def lookup(key: K): Seq[V] = {
+ def lookup(key: K): Seq[V] = self.withScope {
self.partitioner match {
case Some(p) =>
val index = p.getPartition(key)
@@ -859,7 +888,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
* supporting the key and value types K and V in this RDD.
*/
- def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]) {
+ def saveAsHadoopFile[F <: OutputFormat[K, V]](
+ path: String)(implicit fm: ClassTag[F]): Unit = self.withScope {
saveAsHadoopFile(path, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]])
}
@@ -869,7 +899,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* supplied codec.
*/
def saveAsHadoopFile[F <: OutputFormat[K, V]](
- path: String, codec: Class[_ <: CompressionCodec]) (implicit fm: ClassTag[F]) {
+ path: String,
+ codec: Class[_ <: CompressionCodec])(implicit fm: ClassTag[F]): Unit = self.withScope {
val runtimeClass = fm.runtimeClass
saveAsHadoopFile(path, keyClass, valueClass, runtimeClass.asInstanceOf[Class[F]], codec)
}
@@ -878,7 +909,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat`
* (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD.
*/
- def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]) {
+ def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](
+ path: String)(implicit fm: ClassTag[F]): Unit = self.withScope {
saveAsNewAPIHadoopFile(path, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]])
}
@@ -891,8 +923,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
- conf: Configuration = self.context.hadoopConfiguration)
- {
+ conf: Configuration = self.context.hadoopConfiguration): Unit = self.withScope {
// Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
val hadoopConf = conf
val job = new NewAPIHadoopJob(hadoopConf)
@@ -912,7 +943,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: OutputFormat[_, _]],
- codec: Class[_ <: CompressionCodec]) {
+ codec: Class[_ <: CompressionCodec]): Unit = self.withScope {
saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass,
new JobConf(self.context.hadoopConfiguration), Some(codec))
}
@@ -927,7 +958,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
valueClass: Class[_],
outputFormatClass: Class[_ <: OutputFormat[_, _]],
conf: JobConf = new JobConf(self.context.hadoopConfiguration),
- codec: Option[Class[_ <: CompressionCodec]] = None) {
+ codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope {
// Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
val hadoopConf = conf
hadoopConf.setOutputKeyClass(keyClass)
@@ -960,7 +991,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* output paths required (e.g. a table name to write to) in the same way as it would be
* configured for a Hadoop MapReduce job.
*/
- def saveAsNewAPIHadoopDataset(conf: Configuration) {
+ def saveAsNewAPIHadoopDataset(conf: Configuration): Unit = self.withScope {
// Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
val hadoopConf = conf
val job = new NewAPIHadoopJob(hadoopConf)
@@ -1027,7 +1058,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* (e.g. a table name to write to) in the same way as it would be configured for a Hadoop
* MapReduce job.
*/
- def saveAsHadoopDataset(conf: JobConf) {
+ def saveAsHadoopDataset(conf: JobConf): Unit = self.withScope {
// Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
val hadoopConf = conf
val wrappedConf = new SerializableWritable(hadoopConf)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org