You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/05/10 16:44:53 UTC

spark git commit: [SPARK-24171] Adding a note for non-deterministic functions

Repository: spark
Updated Branches:
  refs/heads/master 94d671448 -> f4fed0512


[SPARK-24171] Adding a note for non-deterministic functions

## What changes were proposed in this pull request?

I propose to add a clear statement for functions like `collect_list()` about non-deterministic behavior of such functions. The behavior must be taken into account by user while creating and running queries.

Author: Maxim Gekk <ma...@databricks.com>

Closes #21228 from MaxGekk/deterministic-comments.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f4fed051
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f4fed051
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f4fed051

Branch: refs/heads/master
Commit: f4fed0512101a67d9dae50ace11d3940b910e05e
Parents: 94d6714
Author: Maxim Gekk <ma...@databricks.com>
Authored: Thu May 10 09:44:49 2018 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Thu May 10 09:44:49 2018 -0700

----------------------------------------------------------------------
 R/pkg/R/functions.R                             | 11 +++++
 python/pyspark/sql/functions.py                 | 18 ++++++++
 .../expressions/MonotonicallyIncreasingID.scala |  1 +
 .../spark/sql/catalyst/expressions/misc.scala   |  5 ++-
 .../expressions/randomExpressions.scala         |  8 ++--
 .../scala/org/apache/spark/sql/functions.scala  | 46 ++++++++++++++++++--
 6 files changed, 81 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f4fed051/R/pkg/R/functions.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R
index 0ec99d1..04d0e46 100644
--- a/R/pkg/R/functions.R
+++ b/R/pkg/R/functions.R
@@ -805,6 +805,8 @@ setMethod("factorial",
 #'
 #' The function by default returns the first values it sees. It will return the first non-missing
 #' value it sees when na.rm is set to true. If all values are missing, then NA is returned.
+#' Note: the function is non-deterministic because its results depends on order of rows which
+#' may be non-deterministic after a shuffle.
 #'
 #' @param na.rm a logical value indicating whether NA values should be stripped
 #'        before the computation proceeds.
@@ -948,6 +950,8 @@ setMethod("kurtosis",
 #'
 #' The function by default returns the last values it sees. It will return the last non-missing
 #' value it sees when na.rm is set to true. If all values are missing, then NA is returned.
+#' Note: the function is non-deterministic because its results depends on order of rows which
+#' may be non-deterministic after a shuffle.
 #'
 #' @param x column to compute on.
 #' @param na.rm a logical value indicating whether NA values should be stripped
@@ -1201,6 +1205,7 @@ setMethod("minute",
 #' 0, 1, 2, 8589934592 (1L << 33), 8589934593, 8589934594.
 #' This is equivalent to the MONOTONICALLY_INCREASING_ID function in SQL.
 #' The method should be used with no argument.
+#' Note: the function is non-deterministic because its result depends on partition IDs.
 #'
 #' @rdname column_nonaggregate_functions
 #' @aliases monotonically_increasing_id monotonically_increasing_id,missing-method
@@ -2584,6 +2589,7 @@ setMethod("lpad", signature(x = "Column", len = "numeric", pad = "character"),
 #' @details
 #' \code{rand}: Generates a random column with independent and identically distributed (i.i.d.)
 #' samples from U[0.0, 1.0].
+#' Note: the function is non-deterministic in general case.
 #'
 #' @rdname column_nonaggregate_functions
 #' @param seed a random seed. Can be missing.
@@ -2612,6 +2618,7 @@ setMethod("rand", signature(seed = "numeric"),
 #' @details
 #' \code{randn}: Generates a column with independent and identically distributed (i.i.d.) samples
 #' from the standard normal distribution.
+#' Note: the function is non-deterministic in general case.
 #'
 #' @rdname column_nonaggregate_functions
 #' @aliases randn randn,missing-method
@@ -3188,6 +3195,8 @@ setMethod("create_map",
 
 #' @details
 #' \code{collect_list}: Creates a list of objects with duplicates.
+#' Note: the function is non-deterministic because the order of collected results depends
+#' on order of rows which may be non-deterministic after a shuffle.
 #'
 #' @rdname column_aggregate_functions
 #' @aliases collect_list collect_list,Column-method
@@ -3207,6 +3216,8 @@ setMethod("collect_list",
 
 #' @details
 #' \code{collect_set}: Creates a list of objects with duplicate elements eliminated.
+#' Note: the function is non-deterministic because the order of collected results depends
+#' on order of rows which may be non-deterministic after a shuffle.
 #'
 #' @rdname column_aggregate_functions
 #' @aliases collect_set collect_set,Column-method

http://git-wip-us.apache.org/repos/asf/spark/blob/f4fed051/python/pyspark/sql/functions.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index ac3c797..f5a5841 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -152,6 +152,9 @@ _functions_2_4 = {
 _collect_list_doc = """
     Aggregate function: returns a list of objects with duplicates.
 
+    .. note:: The function is non-deterministic because the order of collected results depends
+        on order of rows which may be non-deterministic after a shuffle.
+
     >>> df2 = spark.createDataFrame([(2,), (5,), (5,)], ('age',))
     >>> df2.agg(collect_list('age')).collect()
     [Row(collect_list(age)=[2, 5, 5])]
@@ -159,6 +162,9 @@ _collect_list_doc = """
 _collect_set_doc = """
     Aggregate function: returns a set of objects with duplicate elements eliminated.
 
+    .. note:: The function is non-deterministic because the order of collected results depends
+        on order of rows which may be non-deterministic after a shuffle.
+
     >>> df2 = spark.createDataFrame([(2,), (5,), (5,)], ('age',))
     >>> df2.agg(collect_set('age')).collect()
     [Row(collect_set(age)=[5, 2])]
@@ -401,6 +407,9 @@ def first(col, ignorenulls=False):
 
     The function by default returns the first values it sees. It will return the first non-null
     value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
+
+    .. note:: The function is non-deterministic because its results depends on order of rows which
+        may be non-deterministic after a shuffle.
     """
     sc = SparkContext._active_spark_context
     jc = sc._jvm.functions.first(_to_java_column(col), ignorenulls)
@@ -489,6 +498,9 @@ def last(col, ignorenulls=False):
 
     The function by default returns the last values it sees. It will return the last non-null
     value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
+
+    .. note:: The function is non-deterministic because its results depends on order of rows
+        which may be non-deterministic after a shuffle.
     """
     sc = SparkContext._active_spark_context
     jc = sc._jvm.functions.last(_to_java_column(col), ignorenulls)
@@ -504,6 +516,8 @@ def monotonically_increasing_id():
     within each partition in the lower 33 bits. The assumption is that the data frame has
     less than 1 billion partitions, and each partition has less than 8 billion records.
 
+    .. note:: The function is non-deterministic because its result depends on partition IDs.
+
     As an example, consider a :class:`DataFrame` with two partitions, each with 3 records.
     This expression would return the following IDs:
     0, 1, 2, 8589934592 (1L << 33), 8589934593, 8589934594.
@@ -536,6 +550,8 @@ def rand(seed=None):
     """Generates a random column with independent and identically distributed (i.i.d.) samples
     from U[0.0, 1.0].
 
+    .. note:: The function is non-deterministic in general case.
+
     >>> df.withColumn('rand', rand(seed=42) * 3).collect()
     [Row(age=2, name=u'Alice', rand=1.1568609015300986),
      Row(age=5, name=u'Bob', rand=1.403379671529166)]
@@ -554,6 +570,8 @@ def randn(seed=None):
     """Generates a column with independent and identically distributed (i.i.d.) samples from
     the standard normal distribution.
 
+    .. note:: The function is non-deterministic in general case.
+
     >>> df.withColumn('randn', randn(seed=42)).collect()
     [Row(age=2, name=u'Alice', randn=-0.7556247885860078),
     Row(age=5, name=u'Bob', randn=-0.0861619008451133)]

http://git-wip-us.apache.org/repos/asf/spark/blob/f4fed051/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala
index ad1e7bd..9f07796 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala
@@ -38,6 +38,7 @@ import org.apache.spark.sql.types.{DataType, LongType}
       puts the partition ID in the upper 31 bits, and the lower 33 bits represent the record number
       within each partition. The assumption is that the data frame has less than 1 billion
       partitions, and each partition has less than 8 billion records.
+      The function is non-deterministic because its result depends on partition IDs.
   """)
 case class MonotonicallyIncreasingID() extends LeafExpression with Stateful {
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f4fed051/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
index 7eda65a..b783469 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
@@ -117,12 +117,13 @@ case class CurrentDatabase() extends LeafExpression with Unevaluable {
 
 // scalastyle:off line.size.limit
 @ExpressionDescription(
-  usage = "_FUNC_() - Returns an universally unique identifier (UUID) string. The value is returned as a canonical UUID 36-character string.",
+  usage = """_FUNC_() - Returns an universally unique identifier (UUID) string. The value is returned as a canonical UUID 36-character string.""",
   examples = """
     Examples:
       > SELECT _FUNC_();
        46707d92-02f4-4817-8116-a4c3b23e6266
-  """)
+  """,
+  note = "The function is non-deterministic.")
 // scalastyle:on line.size.limit
 case class Uuid(randomSeed: Option[Long] = None) extends LeafExpression with Stateful {
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f4fed051/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/randomExpressions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/randomExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/randomExpressions.scala
index 7018605..2653b28 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/randomExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/randomExpressions.scala
@@ -68,7 +68,8 @@ abstract class RDG extends UnaryExpression with ExpectsInputTypes with Stateful
        0.8446490682263027
       > SELECT _FUNC_(null);
        0.8446490682263027
-  """)
+  """,
+  note = "The function is non-deterministic in general case.")
 // scalastyle:on line.size.limit
 case class Rand(child: Expression) extends RDG {
 
@@ -96,7 +97,7 @@ object Rand {
 /** Generate a random column with i.i.d. values drawn from the standard normal distribution. */
 // scalastyle:off line.size.limit
 @ExpressionDescription(
-  usage = "_FUNC_([seed]) - Returns a random value with independent and identically distributed (i.i.d.) values drawn from the standard normal distribution.",
+  usage = """_FUNC_([seed]) - Returns a random value with independent and identically distributed (i.i.d.) values drawn from the standard normal distribution.""",
   examples = """
     Examples:
       > SELECT _FUNC_();
@@ -105,7 +106,8 @@ object Rand {
        1.1164209726833079
       > SELECT _FUNC_(null);
        1.1164209726833079
-  """)
+  """,
+  note = "The function is non-deterministic in general case.")
 // scalastyle:on line.size.limit
 case class Randn(child: Expression) extends RDG {
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f4fed051/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 28cf705..225de00 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -283,6 +283,9 @@ object functions {
   /**
    * Aggregate function: returns a list of objects with duplicates.
    *
+   * @note The function is non-deterministic because the order of collected results depends
+   * on order of rows which may be non-deterministic after a shuffle.
+   *
    * @group agg_funcs
    * @since 1.6.0
    */
@@ -291,6 +294,9 @@ object functions {
   /**
    * Aggregate function: returns a list of objects with duplicates.
    *
+   * @note The function is non-deterministic because the order of collected results depends
+   * on order of rows which may be non-deterministic after a shuffle.
+   *
    * @group agg_funcs
    * @since 1.6.0
    */
@@ -299,6 +305,9 @@ object functions {
   /**
    * Aggregate function: returns a set of objects with duplicate elements eliminated.
    *
+   * @note The function is non-deterministic because the order of collected results depends
+   * on order of rows which may be non-deterministic after a shuffle.
+   *
    * @group agg_funcs
    * @since 1.6.0
    */
@@ -307,6 +316,9 @@ object functions {
   /**
    * Aggregate function: returns a set of objects with duplicate elements eliminated.
    *
+   * @note The function is non-deterministic because the order of collected results depends
+   * on order of rows which may be non-deterministic after a shuffle.
+   *
    * @group agg_funcs
    * @since 1.6.0
    */
@@ -422,6 +434,9 @@ object functions {
    * The function by default returns the first values it sees. It will return the first non-null
    * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
    *
+   * @note The function is non-deterministic because its results depends on order of rows which
+   * may be non-deterministic after a shuffle.
+   *
    * @group agg_funcs
    * @since 2.0.0
    */
@@ -435,6 +450,9 @@ object functions {
    * The function by default returns the first values it sees. It will return the first non-null
    * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
    *
+   * @note The function is non-deterministic because its results depends on order of rows which
+   * may be non-deterministic after a shuffle.
+   *
    * @group agg_funcs
    * @since 2.0.0
    */
@@ -448,6 +466,9 @@ object functions {
    * The function by default returns the first values it sees. It will return the first non-null
    * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
    *
+   * @note The function is non-deterministic because its results depends on order of rows which
+   * may be non-deterministic after a shuffle.
+   *
    * @group agg_funcs
    * @since 1.3.0
    */
@@ -459,6 +480,9 @@ object functions {
    * The function by default returns the first values it sees. It will return the first non-null
    * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
    *
+   * @note The function is non-deterministic because its results depends on order of rows which
+   * may be non-deterministic after a shuffle.
+   *
    * @group agg_funcs
    * @since 1.3.0
    */
@@ -535,6 +559,9 @@ object functions {
    * The function by default returns the last values it sees. It will return the last non-null
    * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
    *
+   * @note The function is non-deterministic because its results depends on order of rows which
+   * may be non-deterministic after a shuffle.
+   *
    * @group agg_funcs
    * @since 2.0.0
    */
@@ -548,6 +575,9 @@ object functions {
    * The function by default returns the last values it sees. It will return the last non-null
    * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
    *
+   * @note The function is non-deterministic because its results depends on order of rows which
+   * may be non-deterministic after a shuffle.
+   *
    * @group agg_funcs
    * @since 2.0.0
    */
@@ -561,6 +591,9 @@ object functions {
    * The function by default returns the last values it sees. It will return the last non-null
    * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
    *
+   * @note The function is non-deterministic because its results depends on order of rows which
+   * may be non-deterministic after a shuffle.
+   *
    * @group agg_funcs
    * @since 1.3.0
    */
@@ -572,6 +605,9 @@ object functions {
    * The function by default returns the last values it sees. It will return the last non-null
    * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
    *
+   * @note The function is non-deterministic because its results depends on order of rows which
+   * may be non-deterministic after a shuffle.
+   *
    * @group agg_funcs
    * @since 1.3.0
    */
@@ -1344,7 +1380,7 @@ object functions {
    * Generate a random column with independent and identically distributed (i.i.d.) samples
    * from U[0.0, 1.0].
    *
-   * @note This is indeterministic when data partitions are not fixed.
+   * @note The function is non-deterministic in general case.
    *
    * @group normal_funcs
    * @since 1.4.0
@@ -1355,6 +1391,8 @@ object functions {
    * Generate a random column with independent and identically distributed (i.i.d.) samples
    * from U[0.0, 1.0].
    *
+   * @note The function is non-deterministic in general case.
+   *
    * @group normal_funcs
    * @since 1.4.0
    */
@@ -1364,7 +1402,7 @@ object functions {
    * Generate a column with independent and identically distributed (i.i.d.) samples from
    * the standard normal distribution.
    *
-   * @note This is indeterministic when data partitions are not fixed.
+   * @note The function is non-deterministic in general case.
    *
    * @group normal_funcs
    * @since 1.4.0
@@ -1375,6 +1413,8 @@ object functions {
    * Generate a column with independent and identically distributed (i.i.d.) samples from
    * the standard normal distribution.
    *
+   * @note The function is non-deterministic in general case.
+   *
    * @group normal_funcs
    * @since 1.4.0
    */
@@ -1383,7 +1423,7 @@ object functions {
   /**
    * Partition ID.
    *
-   * @note This is indeterministic because it depends on data partitioning and task scheduling.
+   * @note This is non-deterministic because it depends on data partitioning and task scheduling.
    *
    * @group normal_funcs
    * @since 1.6.0


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org