You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/07/12 22:13:30 UTC
spark git commit: [SPARK-24208][SQL][FOLLOWUP] Move test cases to
proper locations
Repository: spark
Updated Branches:
refs/heads/master 07704c971 -> 11384893b
[SPARK-24208][SQL][FOLLOWUP] Move test cases to proper locations
## What changes were proposed in this pull request?
The PR is a followup to move the test cases introduced by the original PR in their proper location.
## How was this patch tested?
moved UTs
Author: Marco Gaido <ma...@gmail.com>
Closes #21751 from mgaido91/SPARK-24208_followup.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/11384893
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/11384893
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/11384893
Branch: refs/heads/master
Commit: 11384893b6ad09c0c8bc6a350bb9540d0d704bb4
Parents: 07704c9
Author: Marco Gaido <ma...@gmail.com>
Authored: Thu Jul 12 15:13:26 2018 -0700
Committer: Xiao Li <ga...@gmail.com>
Committed: Thu Jul 12 15:13:26 2018 -0700
----------------------------------------------------------------------
python/pyspark/sql/tests.py | 32 ++++++++++----------
.../sql/catalyst/analysis/AnalysisSuite.scala | 18 +++++++++++
.../apache/spark/sql/GroupedDatasetSuite.scala | 12 --------
3 files changed, 34 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/11384893/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 4404dbe..565654e 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -5471,6 +5471,22 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
self.assertEqual(r.a, 'hi')
self.assertEqual(r.b, 1)
+ def test_self_join_with_pandas(self):
+ import pyspark.sql.functions as F
+
+ @F.pandas_udf('key long, col string', F.PandasUDFType.GROUPED_MAP)
+ def dummy_pandas_udf(df):
+ return df[['key', 'col']]
+
+ df = self.spark.createDataFrame([Row(key=1, col='A'), Row(key=1, col='B'),
+ Row(key=2, col='C')])
+ df_with_pandas = df.groupBy('key').apply(dummy_pandas_udf)
+
+ # this was throwing an AnalysisException before SPARK-24208
+ res = df_with_pandas.alias('temp0').join(df_with_pandas.alias('temp1'),
+ F.col('temp0.key') == F.col('temp1.key'))
+ self.assertEquals(res.count(), 5)
+
@unittest.skipIf(
not _have_pandas or not _have_pyarrow,
@@ -5925,22 +5941,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
'mixture.*aggregate function.*group aggregate pandas UDF'):
df.groupby(df.id).agg(mean_udf(df.v), mean(df.v)).collect()
- def test_self_join_with_pandas(self):
- import pyspark.sql.functions as F
-
- @F.pandas_udf('key long, col string', F.PandasUDFType.GROUPED_MAP)
- def dummy_pandas_udf(df):
- return df[['key', 'col']]
-
- df = self.spark.createDataFrame([Row(key=1, col='A'), Row(key=1, col='B'),
- Row(key=2, col='C')])
- dfWithPandas = df.groupBy('key').apply(dummy_pandas_udf)
-
- # this was throwing an AnalysisException before SPARK-24208
- res = dfWithPandas.alias('temp0').join(dfWithPandas.alias('temp1'),
- F.col('temp0.key') == F.col('temp1.key'))
- self.assertEquals(res.count(), 5)
-
@unittest.skipIf(
not _have_pandas or not _have_pyarrow,
http://git-wip-us.apache.org/repos/asf/spark/blob/11384893/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index cd85795..bbcdf6c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -21,6 +21,7 @@ import java.util.TimeZone
import org.scalatest.Matchers
+import org.apache.spark.api.python.PythonEvalType
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
@@ -557,4 +558,21 @@ class AnalysisSuite extends AnalysisTest with Matchers {
SubqueryAlias("tbl", testRelation)))
assertAnalysisError(barrier, Seq("cannot resolve '`tbl.b`'"))
}
+
+ test("SPARK-24208: analysis fails on self-join with FlatMapGroupsInPandas") {
+ val pythonUdf = PythonUDF("pyUDF", null,
+ StructType(Seq(StructField("a", LongType))),
+ Seq.empty,
+ PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
+ true)
+ val output = pythonUdf.dataType.asInstanceOf[StructType].toAttributes
+ val project = Project(Seq(UnresolvedAttribute("a")), testRelation)
+ val flatMapGroupsInPandas = FlatMapGroupsInPandas(
+ Seq(UnresolvedAttribute("a")), pythonUdf, output, project)
+ val left = SubqueryAlias("temp0", flatMapGroupsInPandas)
+ val right = SubqueryAlias("temp1", flatMapGroupsInPandas)
+ val join = Join(left, right, Inner, None)
+ assertAnalysisSuccess(
+ Project(Seq(UnresolvedAttribute("temp0.a"), UnresolvedAttribute("temp1.a")), join))
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/11384893/sql/core/src/test/scala/org/apache/spark/sql/GroupedDatasetSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/GroupedDatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/GroupedDatasetSuite.scala
index bd54ea4..147c0b6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/GroupedDatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/GroupedDatasetSuite.scala
@@ -93,16 +93,4 @@ class GroupedDatasetSuite extends QueryTest with SharedSQLContext {
}
datasetWithUDF.unpersist(true)
}
-
- test("SPARK-24208: analysis fails on self-join with FlatMapGroupsInPandas") {
- val df = datasetWithUDF.groupBy("s").flatMapGroupsInPandas(PythonUDF(
- "pyUDF",
- null,
- StructType(Seq(StructField("s", LongType))),
- Seq.empty,
- PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
- true))
- val df1 = df.alias("temp0").join(df.alias("temp1"), $"temp0.s" === $"temp1.s")
- df1.queryExecution.assertAnalyzed()
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org