You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2018/12/14 02:45:39 UTC
[spark] branch master updated: [SPARK-26364][PYTHON][TESTING] Clean
up imports in test_pandas_udf*
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 160e583 [SPARK-26364][PYTHON][TESTING] Clean up imports in test_pandas_udf*
160e583 is described below
commit 160e583a17235318c06b95992941a772ff782fae
Author: Li Jin <ic...@gmail.com>
AuthorDate: Fri Dec 14 10:45:24 2018 +0800
[SPARK-26364][PYTHON][TESTING] Clean up imports in test_pandas_udf*
## What changes were proposed in this pull request?
Clean up unconditional import statements and move them to the top.
Conditional imports (pandas, numpy, pyarrow) are left as-is.
## How was this patch tested?
Exising tests.
Closes #23314 from icexelloss/clean-up-test-imports.
Authored-by: Li Jin <ic...@gmail.com>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
python/pyspark/sql/tests/test_pandas_udf.py | 16 ++---
.../sql/tests/test_pandas_udf_grouped_agg.py | 39 +----------
.../sql/tests/test_pandas_udf_grouped_map.py | 40 +++---------
python/pyspark/sql/tests/test_pandas_udf_scalar.py | 75 ++++++----------------
python/pyspark/sql/tests/test_pandas_udf_window.py | 29 +--------
5 files changed, 36 insertions(+), 163 deletions(-)
diff --git a/python/pyspark/sql/tests/test_pandas_udf.py b/python/pyspark/sql/tests/test_pandas_udf.py
index c4b5478..d4d9679 100644
--- a/python/pyspark/sql/tests/test_pandas_udf.py
+++ b/python/pyspark/sql/tests/test_pandas_udf.py
@@ -17,12 +17,16 @@
import unittest
+from pyspark.sql.functions import udf, pandas_udf, PandasUDFType
from pyspark.sql.types import *
from pyspark.sql.utils import ParseException
+from pyspark.rdd import PythonEvalType
from pyspark.testing.sqlutils import ReusedSQLTestCase, have_pandas, have_pyarrow, \
pandas_requirement_message, pyarrow_requirement_message
from pyspark.testing.utils import QuietTest
+from py4j.protocol import Py4JJavaError
+
@unittest.skipIf(
not have_pandas or not have_pyarrow,
@@ -30,9 +34,6 @@ from pyspark.testing.utils import QuietTest
class PandasUDFTests(ReusedSQLTestCase):
def test_pandas_udf_basic(self):
- from pyspark.rdd import PythonEvalType
- from pyspark.sql.functions import pandas_udf, PandasUDFType
-
udf = pandas_udf(lambda x: x, DoubleType())
self.assertEqual(udf.returnType, DoubleType())
self.assertEqual(udf.evalType, PythonEvalType.SQL_SCALAR_PANDAS_UDF)
@@ -65,10 +66,6 @@ class PandasUDFTests(ReusedSQLTestCase):
self.assertEqual(udf.evalType, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF)
def test_pandas_udf_decorator(self):
- from pyspark.rdd import PythonEvalType
- from pyspark.sql.functions import pandas_udf, PandasUDFType
- from pyspark.sql.types import StructType, StructField, DoubleType
-
@pandas_udf(DoubleType())
def foo(x):
return x
@@ -114,8 +111,6 @@ class PandasUDFTests(ReusedSQLTestCase):
self.assertEqual(foo.evalType, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF)
def test_udf_wrong_arg(self):
- from pyspark.sql.functions import pandas_udf, PandasUDFType
-
with QuietTest(self.sc):
with self.assertRaises(ParseException):
@pandas_udf('blah')
@@ -151,9 +146,6 @@ class PandasUDFTests(ReusedSQLTestCase):
return k
def test_stopiteration_in_udf(self):
- from pyspark.sql.functions import udf, pandas_udf, PandasUDFType
- from py4j.protocol import Py4JJavaError
-
def foo(x):
raise StopIteration()
diff --git a/python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py b/python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py
index 5383704..18264ea 100644
--- a/python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py
+++ b/python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py
@@ -17,6 +17,9 @@
import unittest
+from pyspark.rdd import PythonEvalType
+from pyspark.sql.functions import array, explode, col, lit, mean, sum, \
+ udf, pandas_udf, PandasUDFType
from pyspark.sql.types import *
from pyspark.sql.utils import AnalysisException
from pyspark.testing.sqlutils import ReusedSQLTestCase, have_pandas, have_pyarrow, \
@@ -31,7 +34,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
@property
def data(self):
- from pyspark.sql.functions import array, explode, col, lit
return self.spark.range(10).toDF('id') \
.withColumn("vs", array([lit(i * 1.0) + col('id') for i in range(20, 30)])) \
.withColumn("v", explode(col('vs'))) \
@@ -40,8 +42,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
@property
def python_plus_one(self):
- from pyspark.sql.functions import udf
-
@udf('double')
def plus_one(v):
assert isinstance(v, (int, float))
@@ -51,7 +51,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
@property
def pandas_scalar_plus_two(self):
import pandas as pd
- from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf('double', PandasUDFType.SCALAR)
def plus_two(v):
@@ -61,8 +60,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
@property
def pandas_agg_mean_udf(self):
- from pyspark.sql.functions import pandas_udf, PandasUDFType
-
@pandas_udf('double', PandasUDFType.GROUPED_AGG)
def avg(v):
return v.mean()
@@ -70,8 +67,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
@property
def pandas_agg_sum_udf(self):
- from pyspark.sql.functions import pandas_udf, PandasUDFType
-
@pandas_udf('double', PandasUDFType.GROUPED_AGG)
def sum(v):
return v.sum()
@@ -80,7 +75,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
@property
def pandas_agg_weighted_mean_udf(self):
import numpy as np
- from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf('double', PandasUDFType.GROUPED_AGG)
def weighted_mean(v, w):
@@ -88,8 +82,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
return weighted_mean
def test_manual(self):
- from pyspark.sql.functions import pandas_udf, array
-
df = self.data
sum_udf = self.pandas_agg_sum_udf
mean_udf = self.pandas_agg_mean_udf
@@ -118,8 +110,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
def test_basic(self):
- from pyspark.sql.functions import col, lit, mean
-
df = self.data
weighted_mean_udf = self.pandas_agg_weighted_mean_udf
@@ -150,9 +140,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
self.assertPandasEqual(expected4.toPandas(), result4.toPandas())
def test_unsupported_types(self):
- from pyspark.sql.types import DoubleType, MapType
- from pyspark.sql.functions import pandas_udf, PandasUDFType
-
with QuietTest(self.sc):
with self.assertRaisesRegexp(NotImplementedError, 'not supported'):
pandas_udf(
@@ -173,8 +160,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
return {v.mean(): v.std()}
def test_alias(self):
- from pyspark.sql.functions import mean
-
df = self.data
mean_udf = self.pandas_agg_mean_udf
@@ -187,8 +172,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
"""
Test mixing group aggregate pandas UDF with sql expression.
"""
- from pyspark.sql.functions import sum
-
df = self.data
sum_udf = self.pandas_agg_sum_udf
@@ -225,8 +208,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
"""
Test mixing group aggregate pandas UDF with python UDF and scalar pandas UDF.
"""
- from pyspark.sql.functions import sum
-
df = self.data
plus_one = self.python_plus_one
plus_two = self.pandas_scalar_plus_two
@@ -292,8 +273,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
"""
Test multiple group aggregate pandas UDFs in one agg function.
"""
- from pyspark.sql.functions import sum, mean
-
df = self.data
mean_udf = self.pandas_agg_mean_udf
sum_udf = self.pandas_agg_sum_udf
@@ -315,8 +294,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
self.assertPandasEqual(expected1, result1)
def test_complex_groupby(self):
- from pyspark.sql.functions import sum
-
df = self.data
sum_udf = self.pandas_agg_sum_udf
plus_one = self.python_plus_one
@@ -359,8 +336,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
self.assertPandasEqual(expected7.toPandas(), result7.toPandas())
def test_complex_expressions(self):
- from pyspark.sql.functions import col, sum
-
df = self.data
plus_one = self.python_plus_one
plus_two = self.pandas_scalar_plus_two
@@ -434,7 +409,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
self.assertPandasEqual(expected3, result3)
def test_retain_group_columns(self):
- from pyspark.sql.functions import sum
with self.sql_conf({"spark.sql.retainGroupColumns": False}):
df = self.data
sum_udf = self.pandas_agg_sum_udf
@@ -444,8 +418,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
def test_array_type(self):
- from pyspark.sql.functions import pandas_udf, PandasUDFType
-
df = self.data
array_udf = pandas_udf(lambda x: [1.0, 2.0], 'array<double>', PandasUDFType.GROUPED_AGG)
@@ -453,8 +425,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
self.assertEquals(result1.first()['v2'], [1.0, 2.0])
def test_invalid_args(self):
- from pyspark.sql.functions import mean
-
df = self.data
plus_one = self.python_plus_one
mean_udf = self.pandas_agg_mean_udf
@@ -478,9 +448,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
df.groupby(df.id).agg(mean_udf(df.v), mean(df.v)).collect()
def test_register_vectorized_udf_basic(self):
- from pyspark.sql.functions import pandas_udf
- from pyspark.rdd import PythonEvalType
-
sum_pandas_udf = pandas_udf(
lambda v: v.sum(), "integer", PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF)
diff --git a/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py b/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py
index a12c608..80e7034 100644
--- a/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py
+++ b/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py
@@ -18,7 +18,12 @@
import datetime
import unittest
+from collections import OrderedDict
+from decimal import Decimal
+from distutils.version import LooseVersion
+
from pyspark.sql import Row
+from pyspark.sql.functions import array, explode, col, lit, udf, sum, pandas_udf, PandasUDFType
from pyspark.sql.types import *
from pyspark.testing.sqlutils import ReusedSQLTestCase, have_pandas, have_pyarrow, \
pandas_requirement_message, pyarrow_requirement_message
@@ -32,16 +37,12 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
@property
def data(self):
- from pyspark.sql.functions import array, explode, col, lit
return self.spark.range(10).toDF('id') \
.withColumn("vs", array([lit(i) for i in range(20, 30)])) \
.withColumn("v", explode(col('vs'))).drop('vs')
def test_supported_types(self):
- from decimal import Decimal
- from distutils.version import LooseVersion
import pyarrow as pa
- from pyspark.sql.functions import pandas_udf, PandasUDFType
values = [
1, 2, 3,
@@ -131,8 +132,6 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
self.assertPandasEqual(expected3, result3)
def test_array_type_correct(self):
- from pyspark.sql.functions import pandas_udf, PandasUDFType, array, col
-
df = self.data.withColumn("arr", array(col("id"))).repartition(1, "id")
output_schema = StructType(
@@ -151,8 +150,6 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
self.assertPandasEqual(expected, result)
def test_register_grouped_map_udf(self):
- from pyspark.sql.functions import pandas_udf, PandasUDFType
-
foo_udf = pandas_udf(lambda x: x, "id long", PandasUDFType.GROUPED_MAP)
with QuietTest(self.sc):
with self.assertRaisesRegexp(
@@ -161,7 +158,6 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
self.spark.catalog.registerFunction("foo_udf", foo_udf)
def test_decorator(self):
- from pyspark.sql.functions import pandas_udf, PandasUDFType
df = self.data
@pandas_udf(
@@ -176,7 +172,6 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
self.assertPandasEqual(expected, result)
def test_coerce(self):
- from pyspark.sql.functions import pandas_udf, PandasUDFType
df = self.data
foo = pandas_udf(
@@ -191,7 +186,6 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
self.assertPandasEqual(expected, result)
def test_complex_groupby(self):
- from pyspark.sql.functions import pandas_udf, col, PandasUDFType
df = self.data
@pandas_udf(
@@ -210,7 +204,6 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
self.assertPandasEqual(expected, result)
def test_empty_groupby(self):
- from pyspark.sql.functions import pandas_udf, PandasUDFType
df = self.data
@pandas_udf(
@@ -229,7 +222,6 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
self.assertPandasEqual(expected, result)
def test_datatype_string(self):
- from pyspark.sql.functions import pandas_udf, PandasUDFType
df = self.data
foo_udf = pandas_udf(
@@ -243,8 +235,6 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
self.assertPandasEqual(expected, result)
def test_wrong_return_type(self):
- from pyspark.sql.functions import pandas_udf, PandasUDFType
-
with QuietTest(self.sc):
with self.assertRaisesRegexp(
NotImplementedError,
@@ -255,7 +245,6 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
PandasUDFType.GROUPED_MAP)
def test_wrong_args(self):
- from pyspark.sql.functions import udf, pandas_udf, sum, PandasUDFType
df = self.data
with QuietTest(self.sc):
@@ -277,9 +266,7 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
pandas_udf(lambda x, y: x, DoubleType(), PandasUDFType.SCALAR))
def test_unsupported_types(self):
- from distutils.version import LooseVersion
import pyarrow as pa
- from pyspark.sql.functions import pandas_udf, PandasUDFType
common_err_msg = 'Invalid returnType.*grouped map Pandas UDF.*'
unsupported_types = [
@@ -300,7 +287,6 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
# Regression test for SPARK-23314
def test_timestamp_dst(self):
- from pyspark.sql.functions import pandas_udf, PandasUDFType
# Daylight saving time for Los Angeles for 2015 is Sun, Nov 1 at 2:00 am
dt = [datetime.datetime(2015, 11, 1, 0, 30),
datetime.datetime(2015, 11, 1, 1, 30),
@@ -311,12 +297,12 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
self.assertPandasEqual(df.toPandas(), result.toPandas())
def test_udf_with_key(self):
- from pyspark.sql.functions import pandas_udf, PandasUDFType
+ import numpy as np
+
df = self.data
pdf = df.toPandas()
def foo1(key, pdf):
- import numpy as np
assert type(key) == tuple
assert type(key[0]) == np.int64
@@ -326,7 +312,6 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
v4=pdf.v * pdf.id.mean())
def foo2(key, pdf):
- import numpy as np
assert type(key) == tuple
assert type(key[0]) == np.int64
assert type(key[1]) == np.int32
@@ -385,9 +370,7 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
self.assertPandasEqual(expected4, result4)
def test_column_order(self):
- from collections import OrderedDict
import pandas as pd
- from pyspark.sql.functions import pandas_udf, PandasUDFType
# Helper function to set column names from a list
def rename_pdf(pdf, names):
@@ -468,7 +451,6 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
with QuietTest(self.sc):
with self.assertRaisesRegexp(Exception, "KeyError: 'id'"):
grouped_df.apply(column_name_typo).collect()
- from distutils.version import LooseVersion
import pyarrow as pa
if LooseVersion(pa.__version__) < LooseVersion("0.11.0"):
# TODO: see ARROW-1949. Remove when the minimum PyArrow version becomes 0.11.0.
@@ -480,7 +462,6 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
def test_positional_assignment_conf(self):
import pandas as pd
- from pyspark.sql.functions import pandas_udf, PandasUDFType
with self.sql_conf({
"spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName": False}):
@@ -496,9 +477,7 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
self.assertEqual(r.b, 1)
def test_self_join_with_pandas(self):
- import pyspark.sql.functions as F
-
- @F.pandas_udf('key long, col string', F.PandasUDFType.GROUPED_MAP)
+ @pandas_udf('key long, col string', PandasUDFType.GROUPED_MAP)
def dummy_pandas_udf(df):
return df[['key', 'col']]
@@ -508,12 +487,11 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
# this was throwing an AnalysisException before SPARK-24208
res = df_with_pandas.alias('temp0').join(df_with_pandas.alias('temp1'),
- F.col('temp0.key') == F.col('temp1.key'))
+ col('temp0.key') == col('temp1.key'))
self.assertEquals(res.count(), 5)
def test_mixed_scalar_udfs_followed_by_grouby_apply(self):
import pandas as pd
- from pyspark.sql.functions import udf, pandas_udf, PandasUDFType
df = self.spark.range(0, 10).toDF('v1')
df = df.withColumn('v2', udf(lambda x: x + 1, 'int')(df['v1'])) \
diff --git a/python/pyspark/sql/tests/test_pandas_udf_scalar.py b/python/pyspark/sql/tests/test_pandas_udf_scalar.py
index 2f585a3..6a6865a 100644
--- a/python/pyspark/sql/tests/test_pandas_udf_scalar.py
+++ b/python/pyspark/sql/tests/test_pandas_udf_scalar.py
@@ -16,12 +16,20 @@
#
import datetime
import os
+import random
import shutil
import sys
import tempfile
import time
import unittest
+from datetime import date, datetime
+from decimal import Decimal
+from distutils.version import LooseVersion
+
+from pyspark.rdd import PythonEvalType
+from pyspark.sql import Column
+from pyspark.sql.functions import array, col, expr, lit, sum, udf, pandas_udf
from pyspark.sql.types import Row
from pyspark.sql.types import *
from pyspark.sql.utils import AnalysisException
@@ -59,18 +67,16 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
@property
def nondeterministic_vectorized_udf(self):
- from pyspark.sql.functions import pandas_udf
+ import pandas as pd
+ import numpy as np
@pandas_udf('double')
def random_udf(v):
- import pandas as pd
- import numpy as np
return pd.Series(np.random.random(len(v)))
random_udf = random_udf.asNondeterministic()
return random_udf
def test_pandas_udf_tokenize(self):
- from pyspark.sql.functions import pandas_udf
tokenize = pandas_udf(lambda s: s.apply(lambda str: str.split(' ')),
ArrayType(StringType()))
self.assertEqual(tokenize.returnType, ArrayType(StringType()))
@@ -79,7 +85,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
self.assertEqual([Row(hi=[u'hi', u'boo']), Row(hi=[u'bye', u'boo'])], result.collect())
def test_pandas_udf_nested_arrays(self):
- from pyspark.sql.functions import pandas_udf
tokenize = pandas_udf(lambda s: s.apply(lambda str: [str.split(' ')]),
ArrayType(ArrayType(StringType())))
self.assertEqual(tokenize.returnType, ArrayType(ArrayType(StringType())))
@@ -88,7 +93,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
self.assertEqual([Row(hi=[[u'hi', u'boo']]), Row(hi=[[u'bye', u'boo']])], result.collect())
def test_vectorized_udf_basic(self):
- from pyspark.sql.functions import pandas_udf, col, array
df = self.spark.range(10).select(
col('id').cast('string').alias('str'),
col('id').cast('int').alias('int'),
@@ -114,9 +118,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
self.assertEquals(df.collect(), res.collect())
def test_register_nondeterministic_vectorized_udf_basic(self):
- from pyspark.sql.functions import pandas_udf
- from pyspark.rdd import PythonEvalType
- import random
random_pandas_udf = pandas_udf(
lambda x: random.randint(6, 6) + x, IntegerType()).asNondeterministic()
self.assertEqual(random_pandas_udf.deterministic, False)
@@ -129,7 +130,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
self.assertEqual(row[0], 7)
def test_vectorized_udf_null_boolean(self):
- from pyspark.sql.functions import pandas_udf, col
data = [(True,), (True,), (None,), (False,)]
schema = StructType().add("bool", BooleanType())
df = self.spark.createDataFrame(data, schema)
@@ -138,7 +138,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
self.assertEquals(df.collect(), res.collect())
def test_vectorized_udf_null_byte(self):
- from pyspark.sql.functions import pandas_udf, col
data = [(None,), (2,), (3,), (4,)]
schema = StructType().add("byte", ByteType())
df = self.spark.createDataFrame(data, schema)
@@ -147,7 +146,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
self.assertEquals(df.collect(), res.collect())
def test_vectorized_udf_null_short(self):
- from pyspark.sql.functions import pandas_udf, col
data = [(None,), (2,), (3,), (4,)]
schema = StructType().add("short", ShortType())
df = self.spark.createDataFrame(data, schema)
@@ -156,7 +154,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
self.assertEquals(df.collect(), res.collect())
def test_vectorized_udf_null_int(self):
- from pyspark.sql.functions import pandas_udf, col
data = [(None,), (2,), (3,), (4,)]
schema = StructType().add("int", IntegerType())
df = self.spark.createDataFrame(data, schema)
@@ -165,7 +162,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
self.assertEquals(df.collect(), res.collect())
def test_vectorized_udf_null_long(self):
- from pyspark.sql.functions import pandas_udf, col
data = [(None,), (2,), (3,), (4,)]
schema = StructType().add("long", LongType())
df = self.spark.createDataFrame(data, schema)
@@ -174,7 +170,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
self.assertEquals(df.collect(), res.collect())
def test_vectorized_udf_null_float(self):
- from pyspark.sql.functions import pandas_udf, col
data = [(3.0,), (5.0,), (-1.0,), (None,)]
schema = StructType().add("float", FloatType())
df = self.spark.createDataFrame(data, schema)
@@ -183,7 +178,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
self.assertEquals(df.collect(), res.collect())
def test_vectorized_udf_null_double(self):
- from pyspark.sql.functions import pandas_udf, col
data = [(3.0,), (5.0,), (-1.0,), (None,)]
schema = StructType().add("double", DoubleType())
df = self.spark.createDataFrame(data, schema)
@@ -192,8 +186,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
self.assertEquals(df.collect(), res.collect())
def test_vectorized_udf_null_decimal(self):
- from decimal import Decimal
- from pyspark.sql.functions import pandas_udf, col
data = [(Decimal(3.0),), (Decimal(5.0),), (Decimal(-1.0),), (None,)]
schema = StructType().add("decimal", DecimalType(38, 18))
df = self.spark.createDataFrame(data, schema)
@@ -202,7 +194,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
self.assertEquals(df.collect(), res.collect())
def test_vectorized_udf_null_string(self):
- from pyspark.sql.functions import pandas_udf, col
data = [("foo",), (None,), ("bar",), ("bar",)]
schema = StructType().add("str", StringType())
df = self.spark.createDataFrame(data, schema)
@@ -211,7 +202,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
self.assertEquals(df.collect(), res.collect())
def test_vectorized_udf_string_in_udf(self):
- from pyspark.sql.functions import pandas_udf, col
import pandas as pd
df = self.spark.range(10)
str_f = pandas_udf(lambda x: pd.Series(map(str, x)), StringType())
@@ -220,7 +210,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
self.assertEquals(expected.collect(), actual.collect())
def test_vectorized_udf_datatype_string(self):
- from pyspark.sql.functions import pandas_udf, col
df = self.spark.range(10).select(
col('id').cast('string').alias('str'),
col('id').cast('int').alias('int'),
@@ -244,9 +233,8 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
self.assertEquals(df.collect(), res.collect())
def test_vectorized_udf_null_binary(self):
- from distutils.version import LooseVersion
import pyarrow as pa
- from pyspark.sql.functions import pandas_udf, col
+
if LooseVersion(pa.__version__) < LooseVersion("0.10.0"):
with QuietTest(self.sc):
with self.assertRaisesRegexp(
@@ -262,7 +250,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
self.assertEquals(df.collect(), res.collect())
def test_vectorized_udf_array_type(self):
- from pyspark.sql.functions import pandas_udf, col
data = [([1, 2],), ([3, 4],)]
array_schema = StructType([StructField("array", ArrayType(IntegerType()))])
df = self.spark.createDataFrame(data, schema=array_schema)
@@ -271,7 +258,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
self.assertEquals(df.collect(), result.collect())
def test_vectorized_udf_null_array(self):
- from pyspark.sql.functions import pandas_udf, col
data = [([1, 2],), (None,), (None,), ([3, 4],), (None,)]
array_schema = StructType([StructField("array", ArrayType(IntegerType()))])
df = self.spark.createDataFrame(data, schema=array_schema)
@@ -280,7 +266,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
self.assertEquals(df.collect(), result.collect())
def test_vectorized_udf_complex(self):
- from pyspark.sql.functions import pandas_udf, col, expr
df = self.spark.range(10).select(
col('id').cast('int').alias('a'),
col('id').cast('int').alias('b'),
@@ -293,7 +278,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
self.assertEquals(expected.collect(), res.collect())
def test_vectorized_udf_exception(self):
- from pyspark.sql.functions import pandas_udf, col
df = self.spark.range(10)
raise_exception = pandas_udf(lambda x: x * (1 / 0), LongType())
with QuietTest(self.sc):
@@ -301,8 +285,8 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
df.select(raise_exception(col('id'))).collect()
def test_vectorized_udf_invalid_length(self):
- from pyspark.sql.functions import pandas_udf, col
import pandas as pd
+
df = self.spark.range(10)
raise_exception = pandas_udf(lambda _: pd.Series(1), LongType())
with QuietTest(self.sc):
@@ -312,7 +296,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
df.select(raise_exception(col('id'))).collect()
def test_vectorized_udf_chained(self):
- from pyspark.sql.functions import pandas_udf, col
df = self.spark.range(10)
f = pandas_udf(lambda x: x + 1, LongType())
g = pandas_udf(lambda x: x - 1, LongType())
@@ -320,7 +303,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
self.assertEquals(df.collect(), res.collect())
def test_vectorized_udf_wrong_return_type(self):
- from pyspark.sql.functions import pandas_udf
with QuietTest(self.sc):
with self.assertRaisesRegexp(
NotImplementedError,
@@ -328,7 +310,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
pandas_udf(lambda x: x * 1.0, MapType(LongType(), LongType()))
def test_vectorized_udf_return_scalar(self):
- from pyspark.sql.functions import pandas_udf, col
df = self.spark.range(10)
f = pandas_udf(lambda x: 1.0, DoubleType())
with QuietTest(self.sc):
@@ -336,7 +317,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
df.select(f(col('id'))).collect()
def test_vectorized_udf_decorator(self):
- from pyspark.sql.functions import pandas_udf, col
df = self.spark.range(10)
@pandas_udf(returnType=LongType())
@@ -346,21 +326,18 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
self.assertEquals(df.collect(), res.collect())
def test_vectorized_udf_empty_partition(self):
- from pyspark.sql.functions import pandas_udf, col
df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2))
f = pandas_udf(lambda x: x, LongType())
res = df.select(f(col('id')))
self.assertEquals(df.collect(), res.collect())
def test_vectorized_udf_varargs(self):
- from pyspark.sql.functions import pandas_udf, col
df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2))
f = pandas_udf(lambda *v: v[0], LongType())
res = df.select(f(col('id')))
self.assertEquals(df.collect(), res.collect())
def test_vectorized_udf_unsupported_types(self):
- from pyspark.sql.functions import pandas_udf
with QuietTest(self.sc):
with self.assertRaisesRegexp(
NotImplementedError,
@@ -368,8 +345,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
pandas_udf(lambda x: x, MapType(StringType(), IntegerType()))
def test_vectorized_udf_dates(self):
- from pyspark.sql.functions import pandas_udf, col
- from datetime import date
schema = StructType().add("idx", LongType()).add("date", DateType())
data = [(0, date(1969, 1, 1),),
(1, date(2012, 2, 2),),
@@ -405,8 +380,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
self.assertIsNone(result[i][3]) # "check_data" col
def test_vectorized_udf_timestamps(self):
- from pyspark.sql.functions import pandas_udf, col
- from datetime import datetime
schema = StructType([
StructField("idx", LongType(), True),
StructField("timestamp", TimestampType(), True)])
@@ -447,8 +420,8 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
self.assertIsNone(result[i][3]) # "check_data" col
def test_vectorized_udf_return_timestamp_tz(self):
- from pyspark.sql.functions import pandas_udf, col
import pandas as pd
+
df = self.spark.range(10)
@pandas_udf(returnType=TimestampType())
@@ -465,8 +438,8 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
self.assertEquals(expected, ts)
def test_vectorized_udf_check_config(self):
- from pyspark.sql.functions import pandas_udf, col
import pandas as pd
+
with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": 3}):
df = self.spark.range(10, numPartitions=1)
@@ -479,9 +452,8 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
self.assertTrue(r <= 3)
def test_vectorized_udf_timestamps_respect_session_timezone(self):
- from pyspark.sql.functions import pandas_udf, col
- from datetime import datetime
import pandas as pd
+
schema = StructType([
StructField("idx", LongType(), True),
StructField("timestamp", TimestampType(), True)])
@@ -519,8 +491,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
def test_nondeterministic_vectorized_udf(self):
# Test that nondeterministic UDFs are evaluated only once in chained UDF evaluations
- from pyspark.sql.functions import pandas_udf, col
-
@pandas_udf('double')
def plus_ten(v):
return v + 10
@@ -533,8 +503,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
self.assertTrue(result1['plus_ten(rand)'].equals(result1['rand'] + 10))
def test_nondeterministic_vectorized_udf_in_aggregate(self):
- from pyspark.sql.functions import sum
-
df = self.spark.range(10)
random_udf = self.nondeterministic_vectorized_udf
@@ -545,8 +513,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
df.agg(sum(random_udf(df.id))).collect()
def test_register_vectorized_udf_basic(self):
- from pyspark.rdd import PythonEvalType
- from pyspark.sql.functions import pandas_udf, col, expr
df = self.spark.range(10).select(
col('id').cast('int').alias('a'),
col('id').cast('int').alias('b'))
@@ -563,11 +529,10 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
# Regression test for SPARK-23314
def test_timestamp_dst(self):
- from pyspark.sql.functions import pandas_udf
# Daylight saving time for Los Angeles for 2015 is Sun, Nov 1 at 2:00 am
- dt = [datetime.datetime(2015, 11, 1, 0, 30),
- datetime.datetime(2015, 11, 1, 1, 30),
- datetime.datetime(2015, 11, 1, 2, 30)]
+ dt = [datetime(2015, 11, 1, 0, 30),
+ datetime(2015, 11, 1, 1, 30),
+ datetime(2015, 11, 1, 2, 30)]
df = self.spark.createDataFrame(dt, 'timestamp').toDF('time')
foo_udf = pandas_udf(lambda x: x, 'timestamp')
result = df.withColumn('time', foo_udf(df.time))
@@ -593,7 +558,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
def test_mixed_udf(self):
import pandas as pd
- from pyspark.sql.functions import col, udf, pandas_udf
df = self.spark.range(0, 1).toDF('v')
@@ -696,8 +660,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
def test_mixed_udf_and_sql(self):
import pandas as pd
- from pyspark.sql import Column
- from pyspark.sql.functions import udf, pandas_udf
df = self.spark.range(0, 1).toDF('v')
@@ -758,7 +720,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
# This needs to a separate test because Arrow dependency is optional
import pandas as pd
import numpy as np
- from pyspark.sql.functions import pandas_udf, lit, col
path = tempfile.mkdtemp()
shutil.rmtree(path)
diff --git a/python/pyspark/sql/tests/test_pandas_udf_window.py b/python/pyspark/sql/tests/test_pandas_udf_window.py
index f0e6d26..0a7a19c 100644
--- a/python/pyspark/sql/tests/test_pandas_udf_window.py
+++ b/python/pyspark/sql/tests/test_pandas_udf_window.py
@@ -18,6 +18,8 @@
import unittest
from pyspark.sql.utils import AnalysisException
+from pyspark.sql.functions import array, explode, col, lit, mean, min, max, rank, \
+ udf, pandas_udf, PandasUDFType
from pyspark.sql.window import Window
from pyspark.testing.sqlutils import ReusedSQLTestCase, have_pandas, have_pyarrow, \
pandas_requirement_message, pyarrow_requirement_message
@@ -30,7 +32,6 @@ from pyspark.testing.utils import QuietTest
class WindowPandasUDFTests(ReusedSQLTestCase):
@property
def data(self):
- from pyspark.sql.functions import array, explode, col, lit
return self.spark.range(10).toDF('id') \
.withColumn("vs", array([lit(i * 1.0) + col('id') for i in range(20, 30)])) \
.withColumn("v", explode(col('vs'))) \
@@ -39,18 +40,14 @@ class WindowPandasUDFTests(ReusedSQLTestCase):
@property
def python_plus_one(self):
- from pyspark.sql.functions import udf
return udf(lambda v: v + 1, 'double')
@property
def pandas_scalar_time_two(self):
- from pyspark.sql.functions import pandas_udf
return pandas_udf(lambda v: v * 2, 'double')
@property
def pandas_agg_mean_udf(self):
- from pyspark.sql.functions import pandas_udf, PandasUDFType
-
@pandas_udf('double', PandasUDFType.GROUPED_AGG)
def avg(v):
return v.mean()
@@ -58,8 +55,6 @@ class WindowPandasUDFTests(ReusedSQLTestCase):
@property
def pandas_agg_max_udf(self):
- from pyspark.sql.functions import pandas_udf, PandasUDFType
-
@pandas_udf('double', PandasUDFType.GROUPED_AGG)
def max(v):
return v.max()
@@ -67,8 +62,6 @@ class WindowPandasUDFTests(ReusedSQLTestCase):
@property
def pandas_agg_min_udf(self):
- from pyspark.sql.functions import pandas_udf, PandasUDFType
-
@pandas_udf('double', PandasUDFType.GROUPED_AGG)
def min(v):
return v.min()
@@ -88,8 +81,6 @@ class WindowPandasUDFTests(ReusedSQLTestCase):
return Window.partitionBy()
def test_simple(self):
- from pyspark.sql.functions import mean
-
df = self.data
w = self.unbounded_window
@@ -105,8 +96,6 @@ class WindowPandasUDFTests(ReusedSQLTestCase):
self.assertPandasEqual(expected2.toPandas(), result2.toPandas())
def test_multiple_udfs(self):
- from pyspark.sql.functions import max, min, mean
-
df = self.data
w = self.unbounded_window
@@ -121,8 +110,6 @@ class WindowPandasUDFTests(ReusedSQLTestCase):
self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
def test_replace_existing(self):
- from pyspark.sql.functions import mean
-
df = self.data
w = self.unbounded_window
@@ -132,8 +119,6 @@ class WindowPandasUDFTests(ReusedSQLTestCase):
self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
def test_mixed_sql(self):
- from pyspark.sql.functions import mean
-
df = self.data
w = self.unbounded_window
mean_udf = self.pandas_agg_mean_udf
@@ -144,8 +129,6 @@ class WindowPandasUDFTests(ReusedSQLTestCase):
self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
def test_mixed_udf(self):
- from pyspark.sql.functions import mean
-
df = self.data
w = self.unbounded_window
@@ -171,8 +154,6 @@ class WindowPandasUDFTests(ReusedSQLTestCase):
self.assertPandasEqual(expected2.toPandas(), result2.toPandas())
def test_without_partitionBy(self):
- from pyspark.sql.functions import mean
-
df = self.data
w = self.unpartitioned_window
mean_udf = self.pandas_agg_mean_udf
@@ -187,8 +168,6 @@ class WindowPandasUDFTests(ReusedSQLTestCase):
self.assertPandasEqual(expected2.toPandas(), result2.toPandas())
def test_mixed_sql_and_udf(self):
- from pyspark.sql.functions import max, min, rank, col
-
df = self.data
w = self.unbounded_window
ow = self.ordered_window
@@ -221,8 +200,6 @@ class WindowPandasUDFTests(ReusedSQLTestCase):
self.assertPandasEqual(expected4.toPandas(), result4.toPandas())
def test_array_type(self):
- from pyspark.sql.functions import pandas_udf, PandasUDFType
-
df = self.data
w = self.unbounded_window
@@ -231,8 +208,6 @@ class WindowPandasUDFTests(ReusedSQLTestCase):
self.assertEquals(result1.first()['v2'], [1.0, 2.0])
def test_invalid_args(self):
- from pyspark.sql.functions import pandas_udf, PandasUDFType
-
df = self.data
w = self.unbounded_window
ow = self.ordered_window
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org