You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/04/26 20:47:01 UTC

spark git commit: [SPARK-7152][SQL] Add a Column expression for partition ID.

Repository: spark
Updated Branches:
  refs/heads/master 9a5bbe05f -> ca55dc95b


[SPARK-7152][SQL] Add a Column expression for partition ID.

Author: Reynold Xin <rx...@databricks.com>

Closes #5705 from rxin/df-pid and squashes the following commits:

401018f [Reynold Xin] [SPARK-7152][SQL] Add a Column expression for partition ID.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ca55dc95
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ca55dc95
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ca55dc95

Branch: refs/heads/master
Commit: ca55dc95b777d96b27d4e4c0457dd25145dcd6e9
Parents: 9a5bbe0
Author: Reynold Xin <rx...@databricks.com>
Authored: Sun Apr 26 11:46:58 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Sun Apr 26 11:46:58 2015 -0700

----------------------------------------------------------------------
 python/pyspark/sql/functions.py                 | 30 ++++++++++-----
 .../expressions/SparkPartitionID.scala          | 39 ++++++++++++++++++++
 .../sql/execution/expressions/package.scala     | 23 ++++++++++++
 .../scala/org/apache/spark/sql/functions.scala  | 29 ++++++++++-----
 .../spark/sql/ColumnExpressionSuite.scala       |  8 ++++
 5 files changed, 110 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ca55dc95/python/pyspark/sql/functions.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index bb47923..f48b7b5 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -75,6 +75,20 @@ __all__ += _functions.keys()
 __all__.sort()
 
 
+def approxCountDistinct(col, rsd=None):
+    """Returns a new :class:`Column` for approximate distinct count of ``col``.
+
+    >>> df.agg(approxCountDistinct(df.age).alias('c')).collect()
+    [Row(c=2)]
+    """
+    sc = SparkContext._active_spark_context
+    if rsd is None:
+        jc = sc._jvm.functions.approxCountDistinct(_to_java_column(col))
+    else:
+        jc = sc._jvm.functions.approxCountDistinct(_to_java_column(col), rsd)
+    return Column(jc)
+
+
 def countDistinct(col, *cols):
     """Returns a new :class:`Column` for distinct count of ``col`` or ``cols``.
 
@@ -89,18 +103,16 @@ def countDistinct(col, *cols):
     return Column(jc)
 
 
-def approxCountDistinct(col, rsd=None):
-    """Returns a new :class:`Column` for approximate distinct count of ``col``.
+def sparkPartitionId():
+    """Returns a column for partition ID of the Spark task.
 
-    >>> df.agg(approxCountDistinct(df.age).alias('c')).collect()
-    [Row(c=2)]
+    Note that this is indeterministic because it depends on data partitioning and task scheduling.
+
+    >>> df.repartition(1).select(sparkPartitionId().alias("pid")).collect()
+    [Row(pid=0), Row(pid=0)]
     """
     sc = SparkContext._active_spark_context
-    if rsd is None:
-        jc = sc._jvm.functions.approxCountDistinct(_to_java_column(col))
-    else:
-        jc = sc._jvm.functions.approxCountDistinct(_to_java_column(col), rsd)
-    return Column(jc)
+    return Column(sc._jvm.functions.sparkPartitionId())
 
 
 class UserDefinedFunction(object):

http://git-wip-us.apache.org/repos/asf/spark/blob/ca55dc95/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala
new file mode 100644
index 0000000..fe7607c
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.expressions
+
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.catalyst.expressions.{Row, Expression}
+import org.apache.spark.sql.catalyst.trees
+import org.apache.spark.sql.types.{IntegerType, DataType}
+
+
+/**
+ * Expression that returns the current partition id of the Spark task.
+ */
+case object SparkPartitionID extends Expression with trees.LeafNode[Expression] {
+  self: Product =>
+
+  override type EvaluatedType = Int
+
+  override def nullable: Boolean = false
+
+  override def dataType: DataType = IntegerType
+
+  override def eval(input: Row): Int = TaskContext.get().partitionId()
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ca55dc95/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/package.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/package.scala
new file mode 100644
index 0000000..568b7ac
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/package.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+/**
+ * Package containing expressions that are specific to Spark runtime.
+ */
+package object expressions

http://git-wip-us.apache.org/repos/asf/spark/blob/ca55dc95/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index ff91e1d..9738fd4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -277,6 +277,13 @@ object functions {
   //////////////////////////////////////////////////////////////////////////////////////////////
 
   /**
+   * Computes the absolute value.
+   *
+   * @group normal_funcs
+   */
+  def abs(e: Column): Column = Abs(e.expr)
+
+  /**
    * Returns the first column that is not null.
    * {{{
    *   df.select(coalesce(df("a"), df("b")))
@@ -288,6 +295,13 @@ object functions {
   def coalesce(e: Column*): Column = Coalesce(e.map(_.expr))
 
   /**
+   * Converts a string exprsesion to lower case.
+   *
+   * @group normal_funcs
+   */
+  def lower(e: Column): Column = Lower(e.expr)
+
+  /**
    * Unary minus, i.e. negate the expression.
    * {{{
    *   // Select the amount column and negates all values.
@@ -317,18 +331,13 @@ object functions {
   def not(e: Column): Column = !e
 
   /**
-   * Converts a string expression to upper case.
+   * Partition ID of the Spark task.
    *
-   * @group normal_funcs
-   */
-  def upper(e: Column): Column = Upper(e.expr)
-
-  /**
-   * Converts a string exprsesion to lower case.
+   * Note that this is indeterministic because it depends on data partitioning and task scheduling.
    *
    * @group normal_funcs
    */
-  def lower(e: Column): Column = Lower(e.expr)
+  def sparkPartitionId(): Column = execution.expressions.SparkPartitionID
 
   /**
    * Computes the square root of the specified float value.
@@ -338,11 +347,11 @@ object functions {
   def sqrt(e: Column): Column = Sqrt(e.expr)
 
   /**
-   * Computes the absolutle value.
+   * Converts a string expression to upper case.
    *
    * @group normal_funcs
    */
-  def abs(e: Column): Column = Abs(e.expr)
+  def upper(e: Column): Column = Upper(e.expr)
 
   //////////////////////////////////////////////////////////////////////////////////////////////
   //////////////////////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/spark/blob/ca55dc95/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
index bc8fae1..904073b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
@@ -310,6 +310,14 @@ class ColumnExpressionSuite extends QueryTest {
     )
   }
 
+  test("sparkPartitionId") {
+    val df = TestSQLContext.sparkContext.parallelize(1 to 1, 1).map(i => (i, i)).toDF("a", "b")
+    checkAnswer(
+      df.select(sparkPartitionId()),
+      Row(0)
+    )
+  }
+
   test("lift alias out of cast") {
     compareExpressions(
       col("1234").as("name").cast("int").expr,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org