You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/05/07 18:41:38 UTC

git commit: [SPARK-1460] Returning SchemaRDD instead of normal RDD on Set operations...

Repository: spark
Updated Branches:
  refs/heads/master 3eb53bd59 -> 967635a24


[SPARK-1460] Returning SchemaRDD instead of normal RDD on Set operations...

... that do not change schema

Author: Kan Zhang <kz...@apache.org>

Closes #448 from kanzhang/SPARK-1460 and squashes the following commits:

111e388 [Kan Zhang] silence MiMa errors in EdgeRDD and VertexRDD
91dc787 [Kan Zhang] Taking into account newly added Ordering param
79ed52a [Kan Zhang] [SPARK-1460] Returning SchemaRDD on Set operations that do not change schema


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/967635a2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/967635a2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/967635a2

Branch: refs/heads/master
Commit: 967635a2425a769b932eea0984fe697d6721cab0
Parents: 3eb53bd
Author: Kan Zhang <kz...@apache.org>
Authored: Wed May 7 09:41:31 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Wed May 7 09:41:31 2014 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/rdd/RDD.scala   |  10 +-
 .../scala/org/apache/spark/graphx/EdgeRDD.scala |  10 +-
 .../org/apache/spark/graphx/VertexRDD.scala     |  10 +-
 project/MimaBuild.scala                         |   2 +
 python/pyspark/sql.py                           |  29 ++++
 .../scala/org/apache/spark/sql/SchemaRDD.scala  |  67 ++++++++-
 .../spark/sql/api/java/JavaSchemaRDD.scala      | 140 +++++++++++++++++++
 7 files changed, 246 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/967635a2/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 3b3524f..a1ca612 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -128,7 +128,7 @@ abstract class RDD[T: ClassTag](
   @transient var name: String = null
 
   /** Assign a name to this RDD */
-  def setName(_name: String): RDD[T] = {
+  def setName(_name: String): this.type = {
     name = _name
     this
   }
@@ -138,7 +138,7 @@ abstract class RDD[T: ClassTag](
    * it is computed. This can only be used to assign a new storage level if the RDD does not
    * have a storage level set yet..
    */
-  def persist(newLevel: StorageLevel): RDD[T] = {
+  def persist(newLevel: StorageLevel): this.type = {
     // TODO: Handle changes of StorageLevel
     if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) {
       throw new UnsupportedOperationException(
@@ -152,10 +152,10 @@ abstract class RDD[T: ClassTag](
   }
 
   /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
-  def persist(): RDD[T] = persist(StorageLevel.MEMORY_ONLY)
+  def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
 
   /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
-  def cache(): RDD[T] = persist()
+  def cache(): this.type = persist()
 
   /**
    * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
@@ -163,7 +163,7 @@ abstract class RDD[T: ClassTag](
    * @param blocking Whether to block until all blocks are deleted.
    * @return This RDD.
    */
-  def unpersist(blocking: Boolean = true): RDD[T] = {
+  def unpersist(blocking: Boolean = true): this.type = {
     logInfo("Removing RDD " + id + " from persistence list")
     sc.unpersistRDD(id, blocking)
     storageLevel = StorageLevel.NONE

http://git-wip-us.apache.org/repos/asf/spark/blob/967635a2/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
index 6d04bf7..fa78ca9 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
@@ -51,18 +51,12 @@ class EdgeRDD[@specialized ED: ClassTag](
 
   override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
 
-  override def persist(newLevel: StorageLevel): EdgeRDD[ED] = {
+  override def persist(newLevel: StorageLevel): this.type = {
     partitionsRDD.persist(newLevel)
     this
   }
 
-  /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
-  override def persist(): EdgeRDD[ED] = persist(StorageLevel.MEMORY_ONLY)
-
-  /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
-  override def cache(): EdgeRDD[ED] = persist()
-
-  override def unpersist(blocking: Boolean = true): EdgeRDD[ED] = {
+  override def unpersist(blocking: Boolean = true): this.type = {
     partitionsRDD.unpersist(blocking)
     this
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/967635a2/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
index d6788d4..f0fc605 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
@@ -71,18 +71,12 @@ class VertexRDD[@specialized VD: ClassTag](
   override protected def getPreferredLocations(s: Partition): Seq[String] =
     partitionsRDD.preferredLocations(s)
 
-  override def persist(newLevel: StorageLevel): VertexRDD[VD] = {
+  override def persist(newLevel: StorageLevel): this.type = {
     partitionsRDD.persist(newLevel)
     this
   }
 
-  /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
-  override def persist(): VertexRDD[VD] = persist(StorageLevel.MEMORY_ONLY)
-
-  /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
-  override def cache(): VertexRDD[VD] = persist()
-
-  override def unpersist(blocking: Boolean = true): VertexRDD[VD] = {
+  override def unpersist(blocking: Boolean = true): this.type = {
     partitionsRDD.unpersist(blocking)
     this
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/967635a2/project/MimaBuild.scala
----------------------------------------------------------------------
diff --git a/project/MimaBuild.scala b/project/MimaBuild.scala
index d540dc0..efdb38e 100644
--- a/project/MimaBuild.scala
+++ b/project/MimaBuild.scala
@@ -74,6 +74,8 @@ object MimaBuild {
           ) ++
           excludeSparkClass("rdd.ClassTags") ++
           excludeSparkClass("util.XORShiftRandom") ++
+          excludeSparkClass("graphx.EdgeRDD") ++
+          excludeSparkClass("graphx.VertexRDD") ++
           excludeSparkClass("mllib.recommendation.MFDataGenerator") ++
           excludeSparkClass("mllib.optimization.SquaredGradient") ++
           excludeSparkClass("mllib.regression.RidgeRegressionWithSGD") ++

http://git-wip-us.apache.org/repos/asf/spark/blob/967635a2/python/pyspark/sql.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index 1a62031..6789d70 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -360,6 +360,35 @@ class SchemaRDD(RDD):
         else:
             return None
 
+    def coalesce(self, numPartitions, shuffle=False):
+        rdd = self._jschema_rdd.coalesce(numPartitions, shuffle)
+        return SchemaRDD(rdd, self.sql_ctx)
+
+    def distinct(self):
+        rdd = self._jschema_rdd.distinct()
+        return SchemaRDD(rdd, self.sql_ctx)
+
+    def intersection(self, other):
+        if (other.__class__ is SchemaRDD):
+            rdd = self._jschema_rdd.intersection(other._jschema_rdd)
+            return SchemaRDD(rdd, self.sql_ctx)
+        else:
+            raise ValueError("Can only intersect with another SchemaRDD")
+
+    def repartition(self, numPartitions):
+        rdd = self._jschema_rdd.repartition(numPartitions)
+        return SchemaRDD(rdd, self.sql_ctx)
+
+    def subtract(self, other, numPartitions=None):
+        if (other.__class__ is SchemaRDD):
+            if numPartitions is None:
+                rdd = self._jschema_rdd.subtract(other._jschema_rdd)
+            else:
+                rdd = self._jschema_rdd.subtract(other._jschema_rdd, numPartitions)
+            return SchemaRDD(rdd, self.sql_ctx)
+        else:
+            raise ValueError("Can only subtract another SchemaRDD")
+
 def _test():
     import doctest
     from pyspark.context import SparkContext

http://git-wip-us.apache.org/repos/asf/spark/blob/967635a2/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
index d7782d6..34200be 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
@@ -19,14 +19,16 @@ package org.apache.spark.sql
 
 import net.razorvine.pickle.Pickler
 
-import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext}
+import org.apache.spark.{Dependency, OneToOneDependency, Partition, Partitioner, TaskContext}
 import org.apache.spark.annotation.{AlphaComponent, Experimental}
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.api.java.JavaSchemaRDD
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
 import org.apache.spark.sql.catalyst.types.BooleanType
+import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan}
 import org.apache.spark.api.java.JavaRDD
 import java.util.{Map => JMap}
 
@@ -296,6 +298,13 @@ class SchemaRDD(
    */
   def toSchemaRDD = this
 
+  /**
+   * Returns this RDD as a JavaSchemaRDD.
+   *
+   * @group schema
+   */
+  def toJavaSchemaRDD: JavaSchemaRDD = new JavaSchemaRDD(sqlContext, logicalPlan)
+
   private[sql] def javaToPython: JavaRDD[Array[Byte]] = {
     val fieldNames: Seq[String] = this.queryExecution.analyzed.output.map(_.name)
     this.mapPartitions { iter =>
@@ -314,4 +323,60 @@ class SchemaRDD(
       }
     }
   }
+
+  /**
+   * Creates SchemaRDD by applying own schema to derived RDD. Typically used to wrap return value
+   * of base RDD functions that do not change schema.
+   *
+   * @param rdd RDD derived from this one and has same schema
+   *
+   * @group schema
+   */
+  private def applySchema(rdd: RDD[Row]): SchemaRDD = {
+    new SchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(logicalPlan.output, rdd)))
+  }
+
+  // =======================================================================
+  // Base RDD functions that do NOT change schema
+  // =======================================================================
+
+  // Transformations (return a new RDD)
+
+  override def coalesce(numPartitions: Int, shuffle: Boolean = false)
+                       (implicit ord: Ordering[Row] = null): SchemaRDD =
+    applySchema(super.coalesce(numPartitions, shuffle)(ord))
+
+  override def distinct(): SchemaRDD =
+    applySchema(super.distinct())
+
+  override def distinct(numPartitions: Int)
+                       (implicit ord: Ordering[Row] = null): SchemaRDD =
+    applySchema(super.distinct(numPartitions)(ord))
+
+  override def filter(f: Row => Boolean): SchemaRDD =
+    applySchema(super.filter(f))
+
+  override def intersection(other: RDD[Row]): SchemaRDD =
+    applySchema(super.intersection(other))
+
+  override def intersection(other: RDD[Row], partitioner: Partitioner)
+                           (implicit ord: Ordering[Row] = null): SchemaRDD =
+    applySchema(super.intersection(other, partitioner)(ord))
+
+  override def intersection(other: RDD[Row], numPartitions: Int): SchemaRDD =
+    applySchema(super.intersection(other, numPartitions))
+
+  override def repartition(numPartitions: Int)
+                          (implicit ord: Ordering[Row] = null): SchemaRDD =
+    applySchema(super.repartition(numPartitions)(ord))
+
+  override def subtract(other: RDD[Row]): SchemaRDD =
+    applySchema(super.subtract(other))
+
+  override def subtract(other: RDD[Row], numPartitions: Int): SchemaRDD =
+    applySchema(super.subtract(other, numPartitions))
+
+  override def subtract(other: RDD[Row], p: Partitioner)
+                       (implicit ord: Ordering[Row] = null): SchemaRDD =
+    applySchema(super.subtract(other, p)(ord))
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/967635a2/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala
index d43d672..22f57b7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala
@@ -17,10 +17,13 @@
 
 package org.apache.spark.sql.api.java
 
+import org.apache.spark.Partitioner
 import org.apache.spark.api.java.{JavaRDDLike, JavaRDD}
+import org.apache.spark.api.java.function.{Function => JFunction}
 import org.apache.spark.sql.{SQLContext, SchemaRDD, SchemaRDDLike}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
 
 /**
  * An RDD of [[Row]] objects that is returned as the result of a Spark SQL query.  In addition to
@@ -45,4 +48,141 @@ class JavaSchemaRDD(
   override def wrapRDD(rdd: RDD[Row]): JavaRDD[Row] = JavaRDD.fromRDD(rdd)
 
   val rdd = baseSchemaRDD.map(new Row(_))
+
+  override def toString: String = baseSchemaRDD.toString
+
+  // =======================================================================
+  // Base RDD functions that do NOT change schema
+  // =======================================================================
+
+  // Common RDD functions
+
+  /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
+  def cache(): JavaSchemaRDD = {
+    baseSchemaRDD.cache()
+    this
+  }
+
+  /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
+  def persist(): JavaSchemaRDD = {
+    baseSchemaRDD.persist()
+    this
+  }
+
+  /**
+   * Set this RDD's storage level to persist its values across operations after the first time
+   * it is computed. This can only be used to assign a new storage level if the RDD does not
+   * have a storage level set yet..
+   */
+  def persist(newLevel: StorageLevel): JavaSchemaRDD = {
+    baseSchemaRDD.persist(newLevel)
+    this
+  }
+
+  /**
+   * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
+   *
+   * @param blocking Whether to block until all blocks are deleted.
+   * @return This RDD.
+   */
+  def unpersist(blocking: Boolean = true): JavaSchemaRDD = {
+    baseSchemaRDD.unpersist(blocking)
+    this
+  }
+
+  /** Assign a name to this RDD */
+  def setName(name: String): JavaSchemaRDD = {
+    baseSchemaRDD.setName(name)
+    this
+  }
+
+  // Transformations (return a new RDD)
+
+  /**
+   * Return a new RDD that is reduced into `numPartitions` partitions.
+   */
+  def coalesce(numPartitions: Int, shuffle: Boolean = false): JavaSchemaRDD =
+    baseSchemaRDD.coalesce(numPartitions, shuffle).toJavaSchemaRDD
+
+  /**
+   * Return a new RDD containing the distinct elements in this RDD.
+   */
+  def distinct(): JavaSchemaRDD =
+    baseSchemaRDD.distinct().toJavaSchemaRDD
+
+  /**
+   * Return a new RDD containing the distinct elements in this RDD.
+   */
+  def distinct(numPartitions: Int): JavaSchemaRDD =
+    baseSchemaRDD.distinct(numPartitions).toJavaSchemaRDD
+
+  /**
+   * Return a new RDD containing only the elements that satisfy a predicate.
+   */
+  def filter(f: JFunction[Row, java.lang.Boolean]): JavaSchemaRDD =
+    baseSchemaRDD.filter(x => f.call(new Row(x)).booleanValue()).toJavaSchemaRDD
+
+  /**
+   * Return the intersection of this RDD and another one. The output will not contain any
+   * duplicate elements, even if the input RDDs did.
+   *
+   * Note that this method performs a shuffle internally.
+   */
+  def intersection(other: JavaSchemaRDD): JavaSchemaRDD =
+    this.baseSchemaRDD.intersection(other.baseSchemaRDD).toJavaSchemaRDD
+
+  /**
+   * Return the intersection of this RDD and another one. The output will not contain any
+   * duplicate elements, even if the input RDDs did.
+   *
+   * Note that this method performs a shuffle internally.
+   *
+   * @param partitioner Partitioner to use for the resulting RDD
+   */
+  def intersection(other: JavaSchemaRDD, partitioner: Partitioner): JavaSchemaRDD =
+    this.baseSchemaRDD.intersection(other.baseSchemaRDD, partitioner).toJavaSchemaRDD
+
+  /**
+   * Return the intersection of this RDD and another one. The output will not contain any
+   * duplicate elements, even if the input RDDs did.  Performs a hash partition across the cluster
+   *
+   * Note that this method performs a shuffle internally.
+   *
+   * @param numPartitions How many partitions to use in the resulting RDD
+   */
+  def intersection(other: JavaSchemaRDD, numPartitions: Int): JavaSchemaRDD =
+    this.baseSchemaRDD.intersection(other.baseSchemaRDD, numPartitions).toJavaSchemaRDD
+
+  /**
+   * Return a new RDD that has exactly `numPartitions` partitions.
+   *
+   * Can increase or decrease the level of parallelism in this RDD. Internally, this uses
+   * a shuffle to redistribute data.
+   *
+   * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
+   * which can avoid performing a shuffle.
+   */
+  def repartition(numPartitions: Int): JavaSchemaRDD =
+    baseSchemaRDD.repartition(numPartitions).toJavaSchemaRDD
+
+  /**
+   * Return an RDD with the elements from `this` that are not in `other`.
+   *
+   * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
+   * RDD will be <= us.
+   */
+  def subtract(other: JavaSchemaRDD): JavaSchemaRDD =
+    this.baseSchemaRDD.subtract(other.baseSchemaRDD).toJavaSchemaRDD
+
+  /**
+   * Return an RDD with the elements from `this` that are not in `other`.
+   */
+  def subtract(other: JavaSchemaRDD, numPartitions: Int): JavaSchemaRDD =
+    this.baseSchemaRDD.subtract(other.baseSchemaRDD, numPartitions).toJavaSchemaRDD
+
+  /**
+   * Return an RDD with the elements from `this` that are not in `other`.
+   */
+  def subtract(other: JavaSchemaRDD, p: Partitioner): JavaSchemaRDD =
+    this.baseSchemaRDD.subtract(other.baseSchemaRDD, p).toJavaSchemaRDD
 }