You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/02/18 01:23:58 UTC
[spark] branch branch-3.0 updated: [SPARK-30791][SQL][PYTHON] Add
'sameSemantics' and 'sementicHash' methods in Dataset
This is an automated email from the ASF dual-hosted git repository.
weichenxu123 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new ba91415 [SPARK-30791][SQL][PYTHON] Add 'sameSemantics' and 'sementicHash' methods in Dataset
ba91415 is described below
commit ba9141592d0f0ce23c207efb21ae84ac7cc4670a
Author: Liang Zhang <li...@databricks.com>
AuthorDate: Tue Feb 18 09:22:26 2020 +0800
[SPARK-30791][SQL][PYTHON] Add 'sameSemantics' and 'sementicHash' methods in Dataset
### What changes were proposed in this pull request?
This PR added two DeveloperApis to the Dataset[T] class. Both methods are just exposing lower-level methods to the Dataset[T] class.
### Why are the changes needed?
They are useful for checking whether two dataframes are the same when implementing dataframe caching in python, and also get a unique ID. It's easier to use if we wrap the lower-level APIs.
### Does this PR introduce any user-facing change?
```
scala> val df1 = Seq((1,2),(4,5)).toDF("col1", "col2")
df1: org.apache.spark.sql.DataFrame = [col1: int, col2: int]
scala> val df2 = Seq((1,2),(4,5)).toDF("col1", "col2")
df2: org.apache.spark.sql.DataFrame = [col1: int, col2: int]
scala> val df3 = Seq((0,2),(4,5)).toDF("col1", "col2")
df3: org.apache.spark.sql.DataFrame = [col1: int, col2: int]
scala> val df4 = Seq((0,2),(4,5)).toDF("col0", "col2")
df4: org.apache.spark.sql.DataFrame = [col0: int, col2: int]
scala> df1.semanticHash
res0: Int = 594427822
scala> df2.semanticHash
res1: Int = 594427822
scala> df1.sameSemantics(df2)
res2: Boolean = true
scala> df1.sameSemantics(df3)
res3: Boolean = false
scala> df3.semanticHash
res4: Int = -1592702048
scala> df4.semanticHash
res5: Int = -1592702048
scala> df4.sameSemantics(df3)
res6: Boolean = true
```
### How was this patch tested?
Unit test in scala and doctest in python.
Note: comments are copied from the corresponding lower-level APIs.
Note: There are some issues to be fixed that would improve the hash collision rate: https://github.com/apache/spark/pull/27565#discussion_r379881028
Closes #27565 from liangz1/df-same-result.
Authored-by: Liang Zhang <li...@databricks.com>
Signed-off-by: WeichenXu <we...@databricks.com>
(cherry picked from commit d8c0599e542976ef70b60bc673e7c9732fce49e5)
Signed-off-by: WeichenXu <we...@databricks.com>
---
python/pyspark/sql/dataframe.py | 46 ++++++++++++++++++++++
python/pyspark/sql/tests/test_dataframe.py | 5 +++
.../main/scala/org/apache/spark/sql/Dataset.scala | 28 +++++++++++++
.../scala/org/apache/spark/sql/DatasetSuite.scala | 15 +++++++
4 files changed, 94 insertions(+)
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 2432b81..8325b68 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -2153,6 +2153,52 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
"should have been DataFrame." % type(result)
return result
+ @since(3.1)
+ def sameSemantics(self, other):
+ """
+ Returns `True` when the logical query plans inside both :class:`DataFrame`\\s are equal and
+ therefore return same results.
+
+ .. note:: The equality comparison here is simplified by tolerating the cosmetic differences
+ such as attribute names.
+
+ .. note:: This API can compare both :class:`DataFrame`\\s very fast but can still return
+ `False` on the :class:`DataFrame` that return the same results, for instance, from
+ different plans. Such false negative semantic can be useful when caching as an example.
+
+ .. note:: DeveloperApi
+
+ >>> df1 = spark.range(10)
+ >>> df2 = spark.range(10)
+ >>> df1.withColumn("col1", df1.id * 2).sameSemantics(df2.withColumn("col1", df2.id * 2))
+ True
+ >>> df1.withColumn("col1", df1.id * 2).sameSemantics(df2.withColumn("col1", df2.id + 2))
+ False
+ >>> df1.withColumn("col1", df1.id * 2).sameSemantics(df2.withColumn("col0", df2.id * 2))
+ True
+ """
+ if not isinstance(other, DataFrame):
+ raise ValueError("other parameter should be of DataFrame; however, got %s"
+ % type(other))
+ return self._jdf.sameSemantics(other._jdf)
+
+ @since(3.1)
+ def semanticHash(self):
+ """
+ Returns a hash code of the logical query plan against this :class:`DataFrame`.
+
+ .. note:: Unlike the standard hash code, the hash is calculated against the query plan
+ simplified by tolerating the cosmetic differences such as attribute names.
+
+ .. note:: DeveloperApi
+
+ >>> spark.range(10).selectExpr("id as col0").semanticHash() # doctest: +SKIP
+ 1855039936
+ >>> spark.range(10).selectExpr("id as col1").semanticHash() # doctest: +SKIP
+ 1855039936
+ """
+ return self._jdf.semanticHash()
+
where = copy_func(
filter,
sinceversion=1.3,
diff --git a/python/pyspark/sql/tests/test_dataframe.py b/python/pyspark/sql/tests/test_dataframe.py
index d738449..942cd4b 100644
--- a/python/pyspark/sql/tests/test_dataframe.py
+++ b/python/pyspark/sql/tests/test_dataframe.py
@@ -782,6 +782,11 @@ class DataFrameTests(ReusedSQLTestCase):
break
self.assertEqual(df.take(8), result)
+ def test_same_semantics_error(self):
+ with QuietTest(self.sc):
+ with self.assertRaisesRegexp(ValueError, "should be of DataFrame.*int"):
+ self.spark.range(10).sameSemantics(1)
+
class QueryExecutionListenerTests(unittest.TestCase, SQLTestUtils):
# These tests are separate because it uses 'spark.sql.queryExecutionListeners' which is
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 42f3535..5cd2583 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -3310,6 +3310,34 @@ class Dataset[T] private[sql](
files.toSet.toArray
}
+ /**
+ * Returns `true` when the logical query plans inside both [[Dataset]]s are equal and
+ * therefore return same results.
+ *
+ * @note The equality comparison here is simplified by tolerating the cosmetic differences
+ * such as attribute names.
+ * @note This API can compare both [[Dataset]]s very fast but can still return `false` on
+ * the [[Dataset]] that return the same results, for instance, from different plans. Such
+ * false negative semantic can be useful when caching as an example.
+ * @since 3.1.0
+ */
+ @DeveloperApi
+ def sameSemantics(other: Dataset[T]): Boolean = {
+ queryExecution.analyzed.sameResult(other.queryExecution.analyzed)
+ }
+
+ /**
+ * Returns a `hashCode` of the logical query plan against this [[Dataset]].
+ *
+ * @note Unlike the standard `hashCode`, the hash is calculated against the query plan
+ * simplified by tolerating the cosmetic differences such as attribute names.
+ * @since 3.1.0
+ */
+ @DeveloperApi
+ def semanticHash(): Int = {
+ queryExecution.analyzed.semanticHash()
+ }
+
////////////////////////////////////////////////////////////////////////////
// For Python API
////////////////////////////////////////////////////////////////////////////
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index b0bd612..b4ed4ec 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -1909,6 +1909,21 @@ class DatasetSuite extends QueryTest
assert(active eq SparkSession.getActiveSession.get)
}
+
+ test("SPARK-30791: sameSemantics and semanticHash work") {
+ val df1 = Seq((1, 2), (4, 5)).toDF("col1", "col2")
+ val df2 = Seq((1, 2), (4, 5)).toDF("col1", "col2")
+ val df3 = Seq((0, 2), (4, 5)).toDF("col1", "col2")
+ val df4 = Seq((0, 2), (4, 5)).toDF("col0", "col2")
+
+ assert(df1.sameSemantics(df2) === true)
+ assert(df1.sameSemantics(df3) === false)
+ assert(df3.sameSemantics(df4) === true)
+
+ assert(df1.semanticHash === df2.semanticHash)
+ assert(df1.semanticHash !== df3.semanticHash)
+ assert(df3.semanticHash === df4.semanticHash)
+ }
}
object AssertExecutionId {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org