You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by me...@apache.org on 2016/02/25 08:15:39 UTC

spark git commit: [SPARK-13479][SQL][PYTHON] Added Python API for approxQuantile

Repository: spark
Updated Branches:
  refs/heads/master 2b042577f -> 13ce10e95


[SPARK-13479][SQL][PYTHON] Added Python API for approxQuantile

## What changes were proposed in this pull request?

* Scala DataFrameStatFunctions: Added version of approxQuantile taking a List instead of an Array, for Python compatbility
* Python DataFrame and DataFrameStatFunctions: Added approxQuantile

## How was this patch tested?

* unit test in sql/tests.py

Documentation was copied from the existing approxQuantile exactly.

Author: Joseph K. Bradley <jo...@databricks.com>

Closes #11356 from jkbradley/approx-quantile-python.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/13ce10e9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/13ce10e9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/13ce10e9

Branch: refs/heads/master
Commit: 13ce10e95401b21fa40ca0bb27ebf9a0bfffe70f
Parents: 2b04257
Author: Joseph K. Bradley <jo...@databricks.com>
Authored: Wed Feb 24 23:15:36 2016 -0800
Committer: Xiangrui Meng <me...@databricks.com>
Committed: Wed Feb 24 23:15:36 2016 -0800

----------------------------------------------------------------------
 python/pyspark/sql/dataframe.py                 | 54 ++++++++++++++++++++
 python/pyspark/sql/tests.py                     |  7 +++
 .../spark/sql/DataFrameStatFunctions.scala      | 10 ++++
 3 files changed, 71 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/13ce10e9/python/pyspark/sql/dataframe.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 7275e69..76fbb0c 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -1178,6 +1178,55 @@ class DataFrame(object):
         return DataFrame(
             self._jdf.na().replace(self._jseq(subset), self._jmap(rep_dict)), self.sql_ctx)
 
+    @since(2.0)
+    def approxQuantile(self, col, probabilities, relativeError):
+        """
+        Calculates the approximate quantiles of a numerical column of a
+        DataFrame.
+
+        The result of this algorithm has the following deterministic bound:
+        If the DataFrame has N elements and if we request the quantile at
+        probability `p` up to error `err`, then the algorithm will return
+        a sample `x` from the DataFrame so that the *exact* rank of `x` is
+        close to (p * N). More precisely,
+
+          floor((p - err) * N) <= rank(x) <= ceil((p + err) * N).
+
+        This method implements a variation of the Greenwald-Khanna
+        algorithm (with some speed optimizations). The algorithm was first
+        present in [[http://dx.doi.org/10.1145/375663.375670
+        Space-efficient Online Computation of Quantile Summaries]]
+        by Greenwald and Khanna.
+
+        :param col: the name of the numerical column
+        :param probabilities: a list of quantile probabilities
+          Each number must belong to [0, 1].
+          For example 0 is the minimum, 0.5 is the median, 1 is the maximum.
+        :param relativeError:  The relative target precision to achieve
+          (>= 0). If set to zero, the exact quantiles are computed, which
+          could be very expensive. Note that values greater than 1 are
+          accepted but give the same result as 1.
+        :return:  the approximate quantiles at the given probabilities
+        """
+        if not isinstance(col, str):
+            raise ValueError("col should be a string.")
+
+        if not isinstance(probabilities, (list, tuple)):
+            raise ValueError("probabilities should be a list or tuple")
+        if isinstance(probabilities, tuple):
+            probabilities = list(probabilities)
+        for p in probabilities:
+            if not isinstance(p, (float, int, long)) or p < 0 or p > 1:
+                raise ValueError("probabilities should be numerical (float, int, long) in [0,1].")
+        probabilities = _to_list(self._sc, probabilities)
+
+        if not isinstance(relativeError, (float, int, long)) or relativeError < 0:
+            raise ValueError("relativeError should be numerical (float, int, long) >= 0.")
+        relativeError = float(relativeError)
+
+        jaq = self._jdf.stat().approxQuantile(col, probabilities, relativeError)
+        return list(jaq)
+
     @since(1.4)
     def corr(self, col1, col2, method=None):
         """
@@ -1396,6 +1445,11 @@ class DataFrameStatFunctions(object):
     def __init__(self, df):
         self.df = df
 
+    def approxQuantile(self, col, probabilities, relativeError):
+        return self.df.approxQuantile(col, probabilities, relativeError)
+
+    approxQuantile.__doc__ = DataFrame.approxQuantile.__doc__
+
     def corr(self, col1, col2, method=None):
         return self.df.corr(col1, col2, method)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/13ce10e9/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index cc11c0f..90fd769 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -669,6 +669,13 @@ class SQLTests(ReusedPySparkTestCase):
                          functions.last(df2.id, True).alias('d'))
         self.assertEqual([Row(a=None, b=1, c=None, d=98)], df3.collect())
 
+    def test_approxQuantile(self):
+        df = self.sc.parallelize([Row(a=i) for i in range(10)]).toDF()
+        aq = df.stat.approxQuantile("a", [0.1, 0.5, 0.9], 0.1)
+        self.assertTrue(isinstance(aq, list))
+        self.assertEqual(len(aq), 3)
+        self.assertTrue(all(isinstance(q, float) for q in aq))
+
     def test_corr(self):
         import math
         df = self.sc.parallelize([Row(a=i, b=math.sqrt(i)) for i in range(10)]).toDF()

http://git-wip-us.apache.org/repos/asf/spark/blob/13ce10e9/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala
index 39a31ab..3eb1f0f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala
@@ -71,6 +71,16 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) {
   }
 
   /**
+   * Python-friendly version of [[approxQuantile()]]
+   */
+  private[spark] def approxQuantile(
+      col: String,
+      probabilities: List[Double],
+      relativeError: Double): java.util.List[Double] = {
+    approxQuantile(col, probabilities.toArray, relativeError).toList.asJava
+  }
+
+  /**
    * Calculate the sample covariance of two numerical columns of a DataFrame.
    * @param col1 the name of the first column
    * @param col2 the name of the second column


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org