You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/02/18 01:23:58 UTC

[spark] branch branch-3.0 updated: [SPARK-30791][SQL][PYTHON] Add 'sameSemantics' and 'sementicHash' methods in Dataset

This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new ba91415  [SPARK-30791][SQL][PYTHON] Add 'sameSemantics' and 'sementicHash' methods in Dataset
ba91415 is described below

commit ba9141592d0f0ce23c207efb21ae84ac7cc4670a
Author: Liang Zhang <li...@databricks.com>
AuthorDate: Tue Feb 18 09:22:26 2020 +0800

    [SPARK-30791][SQL][PYTHON] Add 'sameSemantics' and 'sementicHash' methods in Dataset
    
    ### What changes were proposed in this pull request?
    This PR added two DeveloperApis to the Dataset[T] class. Both methods are just exposing lower-level methods to the Dataset[T] class.
    
    ### Why are the changes needed?
    They are useful for checking whether two dataframes are the same when implementing dataframe caching in python, and also get a unique ID. It's easier to use if we wrap the lower-level APIs.
    
    ### Does this PR introduce any user-facing change?
    ```
    scala> val df1 = Seq((1,2),(4,5)).toDF("col1", "col2")
    df1: org.apache.spark.sql.DataFrame = [col1: int, col2: int]
    
    scala> val df2 = Seq((1,2),(4,5)).toDF("col1", "col2")
    df2: org.apache.spark.sql.DataFrame = [col1: int, col2: int]
    
    scala> val df3 = Seq((0,2),(4,5)).toDF("col1", "col2")
    df3: org.apache.spark.sql.DataFrame = [col1: int, col2: int]
    
    scala> val df4 = Seq((0,2),(4,5)).toDF("col0", "col2")
    df4: org.apache.spark.sql.DataFrame = [col0: int, col2: int]
    
    scala> df1.semanticHash
    res0: Int = 594427822
    
    scala> df2.semanticHash
    res1: Int = 594427822
    
    scala> df1.sameSemantics(df2)
    res2: Boolean = true
    
    scala> df1.sameSemantics(df3)
    res3: Boolean = false
    
    scala> df3.semanticHash
    res4: Int = -1592702048
    
    scala> df4.semanticHash
    res5: Int = -1592702048
    
    scala> df4.sameSemantics(df3)
    res6: Boolean = true
    ```
    
    ### How was this patch tested?
    Unit test in scala and doctest in python.
    
    Note: comments are copied from the corresponding lower-level APIs.
    Note: There are some issues to be fixed that would improve the hash collision rate: https://github.com/apache/spark/pull/27565#discussion_r379881028
    
    Closes #27565 from liangz1/df-same-result.
    
    Authored-by: Liang Zhang <li...@databricks.com>
    Signed-off-by: WeichenXu <we...@databricks.com>
    (cherry picked from commit d8c0599e542976ef70b60bc673e7c9732fce49e5)
    Signed-off-by: WeichenXu <we...@databricks.com>
---
 python/pyspark/sql/dataframe.py                    | 46 ++++++++++++++++++++++
 python/pyspark/sql/tests/test_dataframe.py         |  5 +++
 .../main/scala/org/apache/spark/sql/Dataset.scala  | 28 +++++++++++++
 .../scala/org/apache/spark/sql/DatasetSuite.scala  | 15 +++++++
 4 files changed, 94 insertions(+)

diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 2432b81..8325b68 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -2153,6 +2153,52 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
                                               "should have been DataFrame." % type(result)
         return result
 
+    @since(3.1)
+    def sameSemantics(self, other):
+        """
+        Returns `True` when the logical query plans inside both :class:`DataFrame`\\s are equal and
+        therefore return same results.
+
+        .. note:: The equality comparison here is simplified by tolerating the cosmetic differences
+            such as attribute names.
+
+        .. note:: This API can compare both :class:`DataFrame`\\s very fast but can still return
+            `False` on the :class:`DataFrame` that return the same results, for instance, from
+            different plans. Such false negative semantic can be useful when caching as an example.
+
+        .. note:: DeveloperApi
+
+        >>> df1 = spark.range(10)
+        >>> df2 = spark.range(10)
+        >>> df1.withColumn("col1", df1.id * 2).sameSemantics(df2.withColumn("col1", df2.id * 2))
+        True
+        >>> df1.withColumn("col1", df1.id * 2).sameSemantics(df2.withColumn("col1", df2.id + 2))
+        False
+        >>> df1.withColumn("col1", df1.id * 2).sameSemantics(df2.withColumn("col0", df2.id * 2))
+        True
+        """
+        if not isinstance(other, DataFrame):
+            raise ValueError("other parameter should be of DataFrame; however, got %s"
+                             % type(other))
+        return self._jdf.sameSemantics(other._jdf)
+
+    @since(3.1)
+    def semanticHash(self):
+        """
+        Returns a hash code of the logical query plan against this :class:`DataFrame`.
+
+        .. note:: Unlike the standard hash code, the hash is calculated against the query plan
+            simplified by tolerating the cosmetic differences such as attribute names.
+
+        .. note:: DeveloperApi
+
+        >>> spark.range(10).selectExpr("id as col0").semanticHash()  # doctest: +SKIP
+        1855039936
+        >>> spark.range(10).selectExpr("id as col1").semanticHash()  # doctest: +SKIP
+        1855039936
+        """
+        return self._jdf.semanticHash()
+
     where = copy_func(
         filter,
         sinceversion=1.3,
diff --git a/python/pyspark/sql/tests/test_dataframe.py b/python/pyspark/sql/tests/test_dataframe.py
index d738449..942cd4b 100644
--- a/python/pyspark/sql/tests/test_dataframe.py
+++ b/python/pyspark/sql/tests/test_dataframe.py
@@ -782,6 +782,11 @@ class DataFrameTests(ReusedSQLTestCase):
                     break
             self.assertEqual(df.take(8), result)
 
+    def test_same_semantics_error(self):
+        with QuietTest(self.sc):
+            with self.assertRaisesRegexp(ValueError, "should be of DataFrame.*int"):
+                self.spark.range(10).sameSemantics(1)
+
 
 class QueryExecutionListenerTests(unittest.TestCase, SQLTestUtils):
     # These tests are separate because it uses 'spark.sql.queryExecutionListeners' which is
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 42f3535..5cd2583 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -3310,6 +3310,34 @@ class Dataset[T] private[sql](
     files.toSet.toArray
   }
 
+  /**
+   * Returns `true` when the logical query plans inside both [[Dataset]]s are equal and
+   * therefore return same results.
+   *
+   * @note The equality comparison here is simplified by tolerating the cosmetic differences
+   *       such as attribute names.
+   * @note This API can compare both [[Dataset]]s very fast but can still return `false` on
+   *       the [[Dataset]] that return the same results, for instance, from different plans. Such
+   *       false negative semantic can be useful when caching as an example.
+   * @since 3.1.0
+   */
+  @DeveloperApi
+  def sameSemantics(other: Dataset[T]): Boolean = {
+    queryExecution.analyzed.sameResult(other.queryExecution.analyzed)
+  }
+
+  /**
+   * Returns a `hashCode` of the logical query plan against this [[Dataset]].
+   *
+   * @note Unlike the standard `hashCode`, the hash is calculated against the query plan
+   *       simplified by tolerating the cosmetic differences such as attribute names.
+   * @since 3.1.0
+   */
+  @DeveloperApi
+  def semanticHash(): Int = {
+    queryExecution.analyzed.semanticHash()
+  }
+
   ////////////////////////////////////////////////////////////////////////////
   // For Python API
   ////////////////////////////////////////////////////////////////////////////
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index b0bd612..b4ed4ec 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -1909,6 +1909,21 @@ class DatasetSuite extends QueryTest
 
     assert(active eq SparkSession.getActiveSession.get)
   }
+
+  test("SPARK-30791: sameSemantics and semanticHash work") {
+    val df1 = Seq((1, 2), (4, 5)).toDF("col1", "col2")
+    val df2 = Seq((1, 2), (4, 5)).toDF("col1", "col2")
+    val df3 = Seq((0, 2), (4, 5)).toDF("col1", "col2")
+    val df4 = Seq((0, 2), (4, 5)).toDF("col0", "col2")
+
+    assert(df1.sameSemantics(df2) === true)
+    assert(df1.sameSemantics(df3) === false)
+    assert(df3.sameSemantics(df4) === true)
+
+    assert(df1.semanticHash === df2.semanticHash)
+    assert(df1.semanticHash !== df3.semanticHash)
+    assert(df3.semanticHash === df4.semanticHash)
+  }
 }
 
 object AssertExecutionId {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org