You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2018/12/14 02:45:39 UTC

[spark] branch master updated: [SPARK-26364][PYTHON][TESTING] Clean up imports in test_pandas_udf*

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 160e583  [SPARK-26364][PYTHON][TESTING] Clean up imports in test_pandas_udf*
160e583 is described below

commit 160e583a17235318c06b95992941a772ff782fae
Author: Li Jin <ic...@gmail.com>
AuthorDate: Fri Dec 14 10:45:24 2018 +0800

    [SPARK-26364][PYTHON][TESTING] Clean up imports in test_pandas_udf*
    
    ## What changes were proposed in this pull request?
    
    Clean up unconditional import statements and move them to the top.
    
    Conditional imports (pandas, numpy, pyarrow) are left as-is.
    
    ## How was this patch tested?
    
    Exising tests.
    
    Closes #23314 from icexelloss/clean-up-test-imports.
    
    Authored-by: Li Jin <ic...@gmail.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 python/pyspark/sql/tests/test_pandas_udf.py        | 16 ++---
 .../sql/tests/test_pandas_udf_grouped_agg.py       | 39 +----------
 .../sql/tests/test_pandas_udf_grouped_map.py       | 40 +++---------
 python/pyspark/sql/tests/test_pandas_udf_scalar.py | 75 ++++++----------------
 python/pyspark/sql/tests/test_pandas_udf_window.py | 29 +--------
 5 files changed, 36 insertions(+), 163 deletions(-)

diff --git a/python/pyspark/sql/tests/test_pandas_udf.py b/python/pyspark/sql/tests/test_pandas_udf.py
index c4b5478..d4d9679 100644
--- a/python/pyspark/sql/tests/test_pandas_udf.py
+++ b/python/pyspark/sql/tests/test_pandas_udf.py
@@ -17,12 +17,16 @@
 
 import unittest
 
+from pyspark.sql.functions import udf, pandas_udf, PandasUDFType
 from pyspark.sql.types import *
 from pyspark.sql.utils import ParseException
+from pyspark.rdd import PythonEvalType
 from pyspark.testing.sqlutils import ReusedSQLTestCase, have_pandas, have_pyarrow, \
     pandas_requirement_message, pyarrow_requirement_message
 from pyspark.testing.utils import QuietTest
 
+from py4j.protocol import Py4JJavaError
+
 
 @unittest.skipIf(
     not have_pandas or not have_pyarrow,
@@ -30,9 +34,6 @@ from pyspark.testing.utils import QuietTest
 class PandasUDFTests(ReusedSQLTestCase):
 
     def test_pandas_udf_basic(self):
-        from pyspark.rdd import PythonEvalType
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
-
         udf = pandas_udf(lambda x: x, DoubleType())
         self.assertEqual(udf.returnType, DoubleType())
         self.assertEqual(udf.evalType, PythonEvalType.SQL_SCALAR_PANDAS_UDF)
@@ -65,10 +66,6 @@ class PandasUDFTests(ReusedSQLTestCase):
         self.assertEqual(udf.evalType, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF)
 
     def test_pandas_udf_decorator(self):
-        from pyspark.rdd import PythonEvalType
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
-        from pyspark.sql.types import StructType, StructField, DoubleType
-
         @pandas_udf(DoubleType())
         def foo(x):
             return x
@@ -114,8 +111,6 @@ class PandasUDFTests(ReusedSQLTestCase):
         self.assertEqual(foo.evalType, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF)
 
     def test_udf_wrong_arg(self):
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
-
         with QuietTest(self.sc):
             with self.assertRaises(ParseException):
                 @pandas_udf('blah')
@@ -151,9 +146,6 @@ class PandasUDFTests(ReusedSQLTestCase):
                     return k
 
     def test_stopiteration_in_udf(self):
-        from pyspark.sql.functions import udf, pandas_udf, PandasUDFType
-        from py4j.protocol import Py4JJavaError
-
         def foo(x):
             raise StopIteration()
 
diff --git a/python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py b/python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py
index 5383704..18264ea 100644
--- a/python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py
+++ b/python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py
@@ -17,6 +17,9 @@
 
 import unittest
 
+from pyspark.rdd import PythonEvalType
+from pyspark.sql.functions import array, explode, col, lit, mean, sum, \
+    udf, pandas_udf, PandasUDFType
 from pyspark.sql.types import *
 from pyspark.sql.utils import AnalysisException
 from pyspark.testing.sqlutils import ReusedSQLTestCase, have_pandas, have_pyarrow, \
@@ -31,7 +34,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
 
     @property
     def data(self):
-        from pyspark.sql.functions import array, explode, col, lit
         return self.spark.range(10).toDF('id') \
             .withColumn("vs", array([lit(i * 1.0) + col('id') for i in range(20, 30)])) \
             .withColumn("v", explode(col('vs'))) \
@@ -40,8 +42,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
 
     @property
     def python_plus_one(self):
-        from pyspark.sql.functions import udf
-
         @udf('double')
         def plus_one(v):
             assert isinstance(v, (int, float))
@@ -51,7 +51,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
     @property
     def pandas_scalar_plus_two(self):
         import pandas as pd
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
 
         @pandas_udf('double', PandasUDFType.SCALAR)
         def plus_two(v):
@@ -61,8 +60,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
 
     @property
     def pandas_agg_mean_udf(self):
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
-
         @pandas_udf('double', PandasUDFType.GROUPED_AGG)
         def avg(v):
             return v.mean()
@@ -70,8 +67,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
 
     @property
     def pandas_agg_sum_udf(self):
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
-
         @pandas_udf('double', PandasUDFType.GROUPED_AGG)
         def sum(v):
             return v.sum()
@@ -80,7 +75,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
     @property
     def pandas_agg_weighted_mean_udf(self):
         import numpy as np
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
 
         @pandas_udf('double', PandasUDFType.GROUPED_AGG)
         def weighted_mean(v, w):
@@ -88,8 +82,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
         return weighted_mean
 
     def test_manual(self):
-        from pyspark.sql.functions import pandas_udf, array
-
         df = self.data
         sum_udf = self.pandas_agg_sum_udf
         mean_udf = self.pandas_agg_mean_udf
@@ -118,8 +110,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
         self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
 
     def test_basic(self):
-        from pyspark.sql.functions import col, lit, mean
-
         df = self.data
         weighted_mean_udf = self.pandas_agg_weighted_mean_udf
 
@@ -150,9 +140,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
         self.assertPandasEqual(expected4.toPandas(), result4.toPandas())
 
     def test_unsupported_types(self):
-        from pyspark.sql.types import DoubleType, MapType
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
-
         with QuietTest(self.sc):
             with self.assertRaisesRegexp(NotImplementedError, 'not supported'):
                 pandas_udf(
@@ -173,8 +160,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
                     return {v.mean(): v.std()}
 
     def test_alias(self):
-        from pyspark.sql.functions import mean
-
         df = self.data
         mean_udf = self.pandas_agg_mean_udf
 
@@ -187,8 +172,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
         """
         Test mixing group aggregate pandas UDF with sql expression.
         """
-        from pyspark.sql.functions import sum
-
         df = self.data
         sum_udf = self.pandas_agg_sum_udf
 
@@ -225,8 +208,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
         """
         Test mixing group aggregate pandas UDF with python UDF and scalar pandas UDF.
         """
-        from pyspark.sql.functions import sum
-
         df = self.data
         plus_one = self.python_plus_one
         plus_two = self.pandas_scalar_plus_two
@@ -292,8 +273,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
         """
         Test multiple group aggregate pandas UDFs in one agg function.
         """
-        from pyspark.sql.functions import sum, mean
-
         df = self.data
         mean_udf = self.pandas_agg_mean_udf
         sum_udf = self.pandas_agg_sum_udf
@@ -315,8 +294,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
         self.assertPandasEqual(expected1, result1)
 
     def test_complex_groupby(self):
-        from pyspark.sql.functions import sum
-
         df = self.data
         sum_udf = self.pandas_agg_sum_udf
         plus_one = self.python_plus_one
@@ -359,8 +336,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
         self.assertPandasEqual(expected7.toPandas(), result7.toPandas())
 
     def test_complex_expressions(self):
-        from pyspark.sql.functions import col, sum
-
         df = self.data
         plus_one = self.python_plus_one
         plus_two = self.pandas_scalar_plus_two
@@ -434,7 +409,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
         self.assertPandasEqual(expected3, result3)
 
     def test_retain_group_columns(self):
-        from pyspark.sql.functions import sum
         with self.sql_conf({"spark.sql.retainGroupColumns": False}):
             df = self.data
             sum_udf = self.pandas_agg_sum_udf
@@ -444,8 +418,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
             self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
 
     def test_array_type(self):
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
-
         df = self.data
 
         array_udf = pandas_udf(lambda x: [1.0, 2.0], 'array<double>', PandasUDFType.GROUPED_AGG)
@@ -453,8 +425,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
         self.assertEquals(result1.first()['v2'], [1.0, 2.0])
 
     def test_invalid_args(self):
-        from pyspark.sql.functions import mean
-
         df = self.data
         plus_one = self.python_plus_one
         mean_udf = self.pandas_agg_mean_udf
@@ -478,9 +448,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
                 df.groupby(df.id).agg(mean_udf(df.v), mean(df.v)).collect()
 
     def test_register_vectorized_udf_basic(self):
-        from pyspark.sql.functions import pandas_udf
-        from pyspark.rdd import PythonEvalType
-
         sum_pandas_udf = pandas_udf(
             lambda v: v.sum(), "integer", PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF)
 
diff --git a/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py b/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py
index a12c608..80e7034 100644
--- a/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py
+++ b/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py
@@ -18,7 +18,12 @@
 import datetime
 import unittest
 
+from collections import OrderedDict
+from decimal import Decimal
+from distutils.version import LooseVersion
+
 from pyspark.sql import Row
+from pyspark.sql.functions import array, explode, col, lit, udf, sum, pandas_udf, PandasUDFType
 from pyspark.sql.types import *
 from pyspark.testing.sqlutils import ReusedSQLTestCase, have_pandas, have_pyarrow, \
     pandas_requirement_message, pyarrow_requirement_message
@@ -32,16 +37,12 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
 
     @property
     def data(self):
-        from pyspark.sql.functions import array, explode, col, lit
         return self.spark.range(10).toDF('id') \
             .withColumn("vs", array([lit(i) for i in range(20, 30)])) \
             .withColumn("v", explode(col('vs'))).drop('vs')
 
     def test_supported_types(self):
-        from decimal import Decimal
-        from distutils.version import LooseVersion
         import pyarrow as pa
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
 
         values = [
             1, 2, 3,
@@ -131,8 +132,6 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
         self.assertPandasEqual(expected3, result3)
 
     def test_array_type_correct(self):
-        from pyspark.sql.functions import pandas_udf, PandasUDFType, array, col
-
         df = self.data.withColumn("arr", array(col("id"))).repartition(1, "id")
 
         output_schema = StructType(
@@ -151,8 +150,6 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
         self.assertPandasEqual(expected, result)
 
     def test_register_grouped_map_udf(self):
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
-
         foo_udf = pandas_udf(lambda x: x, "id long", PandasUDFType.GROUPED_MAP)
         with QuietTest(self.sc):
             with self.assertRaisesRegexp(
@@ -161,7 +158,6 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
                 self.spark.catalog.registerFunction("foo_udf", foo_udf)
 
     def test_decorator(self):
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
         df = self.data
 
         @pandas_udf(
@@ -176,7 +172,6 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
         self.assertPandasEqual(expected, result)
 
     def test_coerce(self):
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
         df = self.data
 
         foo = pandas_udf(
@@ -191,7 +186,6 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
         self.assertPandasEqual(expected, result)
 
     def test_complex_groupby(self):
-        from pyspark.sql.functions import pandas_udf, col, PandasUDFType
         df = self.data
 
         @pandas_udf(
@@ -210,7 +204,6 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
         self.assertPandasEqual(expected, result)
 
     def test_empty_groupby(self):
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
         df = self.data
 
         @pandas_udf(
@@ -229,7 +222,6 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
         self.assertPandasEqual(expected, result)
 
     def test_datatype_string(self):
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
         df = self.data
 
         foo_udf = pandas_udf(
@@ -243,8 +235,6 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
         self.assertPandasEqual(expected, result)
 
     def test_wrong_return_type(self):
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
-
         with QuietTest(self.sc):
             with self.assertRaisesRegexp(
                     NotImplementedError,
@@ -255,7 +245,6 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
                     PandasUDFType.GROUPED_MAP)
 
     def test_wrong_args(self):
-        from pyspark.sql.functions import udf, pandas_udf, sum, PandasUDFType
         df = self.data
 
         with QuietTest(self.sc):
@@ -277,9 +266,7 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
                     pandas_udf(lambda x, y: x, DoubleType(), PandasUDFType.SCALAR))
 
     def test_unsupported_types(self):
-        from distutils.version import LooseVersion
         import pyarrow as pa
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
 
         common_err_msg = 'Invalid returnType.*grouped map Pandas UDF.*'
         unsupported_types = [
@@ -300,7 +287,6 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
 
     # Regression test for SPARK-23314
     def test_timestamp_dst(self):
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
         # Daylight saving time for Los Angeles for 2015 is Sun, Nov 1 at 2:00 am
         dt = [datetime.datetime(2015, 11, 1, 0, 30),
               datetime.datetime(2015, 11, 1, 1, 30),
@@ -311,12 +297,12 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
         self.assertPandasEqual(df.toPandas(), result.toPandas())
 
     def test_udf_with_key(self):
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
+        import numpy as np
+
         df = self.data
         pdf = df.toPandas()
 
         def foo1(key, pdf):
-            import numpy as np
             assert type(key) == tuple
             assert type(key[0]) == np.int64
 
@@ -326,7 +312,6 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
                               v4=pdf.v * pdf.id.mean())
 
         def foo2(key, pdf):
-            import numpy as np
             assert type(key) == tuple
             assert type(key[0]) == np.int64
             assert type(key[1]) == np.int32
@@ -385,9 +370,7 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
         self.assertPandasEqual(expected4, result4)
 
     def test_column_order(self):
-        from collections import OrderedDict
         import pandas as pd
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
 
         # Helper function to set column names from a list
         def rename_pdf(pdf, names):
@@ -468,7 +451,6 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
         with QuietTest(self.sc):
             with self.assertRaisesRegexp(Exception, "KeyError: 'id'"):
                 grouped_df.apply(column_name_typo).collect()
-            from distutils.version import LooseVersion
             import pyarrow as pa
             if LooseVersion(pa.__version__) < LooseVersion("0.11.0"):
                 # TODO: see ARROW-1949. Remove when the minimum PyArrow version becomes 0.11.0.
@@ -480,7 +462,6 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
 
     def test_positional_assignment_conf(self):
         import pandas as pd
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
 
         with self.sql_conf({
                 "spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName": False}):
@@ -496,9 +477,7 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
                 self.assertEqual(r.b, 1)
 
     def test_self_join_with_pandas(self):
-        import pyspark.sql.functions as F
-
-        @F.pandas_udf('key long, col string', F.PandasUDFType.GROUPED_MAP)
+        @pandas_udf('key long, col string', PandasUDFType.GROUPED_MAP)
         def dummy_pandas_udf(df):
             return df[['key', 'col']]
 
@@ -508,12 +487,11 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
 
         # this was throwing an AnalysisException before SPARK-24208
         res = df_with_pandas.alias('temp0').join(df_with_pandas.alias('temp1'),
-                                                 F.col('temp0.key') == F.col('temp1.key'))
+                                                 col('temp0.key') == col('temp1.key'))
         self.assertEquals(res.count(), 5)
 
     def test_mixed_scalar_udfs_followed_by_grouby_apply(self):
         import pandas as pd
-        from pyspark.sql.functions import udf, pandas_udf, PandasUDFType
 
         df = self.spark.range(0, 10).toDF('v1')
         df = df.withColumn('v2', udf(lambda x: x + 1, 'int')(df['v1'])) \
diff --git a/python/pyspark/sql/tests/test_pandas_udf_scalar.py b/python/pyspark/sql/tests/test_pandas_udf_scalar.py
index 2f585a3..6a6865a 100644
--- a/python/pyspark/sql/tests/test_pandas_udf_scalar.py
+++ b/python/pyspark/sql/tests/test_pandas_udf_scalar.py
@@ -16,12 +16,20 @@
 #
 import datetime
 import os
+import random
 import shutil
 import sys
 import tempfile
 import time
 import unittest
 
+from datetime import date, datetime
+from decimal import Decimal
+from distutils.version import LooseVersion
+
+from pyspark.rdd import PythonEvalType
+from pyspark.sql import Column
+from pyspark.sql.functions import array, col, expr, lit, sum, udf, pandas_udf
 from pyspark.sql.types import Row
 from pyspark.sql.types import *
 from pyspark.sql.utils import AnalysisException
@@ -59,18 +67,16 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
 
     @property
     def nondeterministic_vectorized_udf(self):
-        from pyspark.sql.functions import pandas_udf
+        import pandas as pd
+        import numpy as np
 
         @pandas_udf('double')
         def random_udf(v):
-            import pandas as pd
-            import numpy as np
             return pd.Series(np.random.random(len(v)))
         random_udf = random_udf.asNondeterministic()
         return random_udf
 
     def test_pandas_udf_tokenize(self):
-        from pyspark.sql.functions import pandas_udf
         tokenize = pandas_udf(lambda s: s.apply(lambda str: str.split(' ')),
                               ArrayType(StringType()))
         self.assertEqual(tokenize.returnType, ArrayType(StringType()))
@@ -79,7 +85,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
         self.assertEqual([Row(hi=[u'hi', u'boo']), Row(hi=[u'bye', u'boo'])], result.collect())
 
     def test_pandas_udf_nested_arrays(self):
-        from pyspark.sql.functions import pandas_udf
         tokenize = pandas_udf(lambda s: s.apply(lambda str: [str.split(' ')]),
                               ArrayType(ArrayType(StringType())))
         self.assertEqual(tokenize.returnType, ArrayType(ArrayType(StringType())))
@@ -88,7 +93,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
         self.assertEqual([Row(hi=[[u'hi', u'boo']]), Row(hi=[[u'bye', u'boo']])], result.collect())
 
     def test_vectorized_udf_basic(self):
-        from pyspark.sql.functions import pandas_udf, col, array
         df = self.spark.range(10).select(
             col('id').cast('string').alias('str'),
             col('id').cast('int').alias('int'),
@@ -114,9 +118,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
         self.assertEquals(df.collect(), res.collect())
 
     def test_register_nondeterministic_vectorized_udf_basic(self):
-        from pyspark.sql.functions import pandas_udf
-        from pyspark.rdd import PythonEvalType
-        import random
         random_pandas_udf = pandas_udf(
             lambda x: random.randint(6, 6) + x, IntegerType()).asNondeterministic()
         self.assertEqual(random_pandas_udf.deterministic, False)
@@ -129,7 +130,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
         self.assertEqual(row[0], 7)
 
     def test_vectorized_udf_null_boolean(self):
-        from pyspark.sql.functions import pandas_udf, col
         data = [(True,), (True,), (None,), (False,)]
         schema = StructType().add("bool", BooleanType())
         df = self.spark.createDataFrame(data, schema)
@@ -138,7 +138,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
         self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_null_byte(self):
-        from pyspark.sql.functions import pandas_udf, col
         data = [(None,), (2,), (3,), (4,)]
         schema = StructType().add("byte", ByteType())
         df = self.spark.createDataFrame(data, schema)
@@ -147,7 +146,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
         self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_null_short(self):
-        from pyspark.sql.functions import pandas_udf, col
         data = [(None,), (2,), (3,), (4,)]
         schema = StructType().add("short", ShortType())
         df = self.spark.createDataFrame(data, schema)
@@ -156,7 +154,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
         self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_null_int(self):
-        from pyspark.sql.functions import pandas_udf, col
         data = [(None,), (2,), (3,), (4,)]
         schema = StructType().add("int", IntegerType())
         df = self.spark.createDataFrame(data, schema)
@@ -165,7 +162,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
         self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_null_long(self):
-        from pyspark.sql.functions import pandas_udf, col
         data = [(None,), (2,), (3,), (4,)]
         schema = StructType().add("long", LongType())
         df = self.spark.createDataFrame(data, schema)
@@ -174,7 +170,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
         self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_null_float(self):
-        from pyspark.sql.functions import pandas_udf, col
         data = [(3.0,), (5.0,), (-1.0,), (None,)]
         schema = StructType().add("float", FloatType())
         df = self.spark.createDataFrame(data, schema)
@@ -183,7 +178,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
         self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_null_double(self):
-        from pyspark.sql.functions import pandas_udf, col
         data = [(3.0,), (5.0,), (-1.0,), (None,)]
         schema = StructType().add("double", DoubleType())
         df = self.spark.createDataFrame(data, schema)
@@ -192,8 +186,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
         self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_null_decimal(self):
-        from decimal import Decimal
-        from pyspark.sql.functions import pandas_udf, col
         data = [(Decimal(3.0),), (Decimal(5.0),), (Decimal(-1.0),), (None,)]
         schema = StructType().add("decimal", DecimalType(38, 18))
         df = self.spark.createDataFrame(data, schema)
@@ -202,7 +194,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
         self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_null_string(self):
-        from pyspark.sql.functions import pandas_udf, col
         data = [("foo",), (None,), ("bar",), ("bar",)]
         schema = StructType().add("str", StringType())
         df = self.spark.createDataFrame(data, schema)
@@ -211,7 +202,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
         self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_string_in_udf(self):
-        from pyspark.sql.functions import pandas_udf, col
         import pandas as pd
         df = self.spark.range(10)
         str_f = pandas_udf(lambda x: pd.Series(map(str, x)), StringType())
@@ -220,7 +210,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
         self.assertEquals(expected.collect(), actual.collect())
 
     def test_vectorized_udf_datatype_string(self):
-        from pyspark.sql.functions import pandas_udf, col
         df = self.spark.range(10).select(
             col('id').cast('string').alias('str'),
             col('id').cast('int').alias('int'),
@@ -244,9 +233,8 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
         self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_null_binary(self):
-        from distutils.version import LooseVersion
         import pyarrow as pa
-        from pyspark.sql.functions import pandas_udf, col
+
         if LooseVersion(pa.__version__) < LooseVersion("0.10.0"):
             with QuietTest(self.sc):
                 with self.assertRaisesRegexp(
@@ -262,7 +250,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
             self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_array_type(self):
-        from pyspark.sql.functions import pandas_udf, col
         data = [([1, 2],), ([3, 4],)]
         array_schema = StructType([StructField("array", ArrayType(IntegerType()))])
         df = self.spark.createDataFrame(data, schema=array_schema)
@@ -271,7 +258,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
         self.assertEquals(df.collect(), result.collect())
 
     def test_vectorized_udf_null_array(self):
-        from pyspark.sql.functions import pandas_udf, col
         data = [([1, 2],), (None,), (None,), ([3, 4],), (None,)]
         array_schema = StructType([StructField("array", ArrayType(IntegerType()))])
         df = self.spark.createDataFrame(data, schema=array_schema)
@@ -280,7 +266,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
         self.assertEquals(df.collect(), result.collect())
 
     def test_vectorized_udf_complex(self):
-        from pyspark.sql.functions import pandas_udf, col, expr
         df = self.spark.range(10).select(
             col('id').cast('int').alias('a'),
             col('id').cast('int').alias('b'),
@@ -293,7 +278,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
         self.assertEquals(expected.collect(), res.collect())
 
     def test_vectorized_udf_exception(self):
-        from pyspark.sql.functions import pandas_udf, col
         df = self.spark.range(10)
         raise_exception = pandas_udf(lambda x: x * (1 / 0), LongType())
         with QuietTest(self.sc):
@@ -301,8 +285,8 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
                 df.select(raise_exception(col('id'))).collect()
 
     def test_vectorized_udf_invalid_length(self):
-        from pyspark.sql.functions import pandas_udf, col
         import pandas as pd
+
         df = self.spark.range(10)
         raise_exception = pandas_udf(lambda _: pd.Series(1), LongType())
         with QuietTest(self.sc):
@@ -312,7 +296,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
                 df.select(raise_exception(col('id'))).collect()
 
     def test_vectorized_udf_chained(self):
-        from pyspark.sql.functions import pandas_udf, col
         df = self.spark.range(10)
         f = pandas_udf(lambda x: x + 1, LongType())
         g = pandas_udf(lambda x: x - 1, LongType())
@@ -320,7 +303,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
         self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_wrong_return_type(self):
-        from pyspark.sql.functions import pandas_udf
         with QuietTest(self.sc):
             with self.assertRaisesRegexp(
                     NotImplementedError,
@@ -328,7 +310,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
                 pandas_udf(lambda x: x * 1.0, MapType(LongType(), LongType()))
 
     def test_vectorized_udf_return_scalar(self):
-        from pyspark.sql.functions import pandas_udf, col
         df = self.spark.range(10)
         f = pandas_udf(lambda x: 1.0, DoubleType())
         with QuietTest(self.sc):
@@ -336,7 +317,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
                 df.select(f(col('id'))).collect()
 
     def test_vectorized_udf_decorator(self):
-        from pyspark.sql.functions import pandas_udf, col
         df = self.spark.range(10)
 
         @pandas_udf(returnType=LongType())
@@ -346,21 +326,18 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
         self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_empty_partition(self):
-        from pyspark.sql.functions import pandas_udf, col
         df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2))
         f = pandas_udf(lambda x: x, LongType())
         res = df.select(f(col('id')))
         self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_varargs(self):
-        from pyspark.sql.functions import pandas_udf, col
         df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2))
         f = pandas_udf(lambda *v: v[0], LongType())
         res = df.select(f(col('id')))
         self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_unsupported_types(self):
-        from pyspark.sql.functions import pandas_udf
         with QuietTest(self.sc):
             with self.assertRaisesRegexp(
                     NotImplementedError,
@@ -368,8 +345,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
                 pandas_udf(lambda x: x, MapType(StringType(), IntegerType()))
 
     def test_vectorized_udf_dates(self):
-        from pyspark.sql.functions import pandas_udf, col
-        from datetime import date
         schema = StructType().add("idx", LongType()).add("date", DateType())
         data = [(0, date(1969, 1, 1),),
                 (1, date(2012, 2, 2),),
@@ -405,8 +380,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
             self.assertIsNone(result[i][3])  # "check_data" col
 
     def test_vectorized_udf_timestamps(self):
-        from pyspark.sql.functions import pandas_udf, col
-        from datetime import datetime
         schema = StructType([
             StructField("idx", LongType(), True),
             StructField("timestamp", TimestampType(), True)])
@@ -447,8 +420,8 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
             self.assertIsNone(result[i][3])  # "check_data" col
 
     def test_vectorized_udf_return_timestamp_tz(self):
-        from pyspark.sql.functions import pandas_udf, col
         import pandas as pd
+
         df = self.spark.range(10)
 
         @pandas_udf(returnType=TimestampType())
@@ -465,8 +438,8 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
             self.assertEquals(expected, ts)
 
     def test_vectorized_udf_check_config(self):
-        from pyspark.sql.functions import pandas_udf, col
         import pandas as pd
+
         with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": 3}):
             df = self.spark.range(10, numPartitions=1)
 
@@ -479,9 +452,8 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
                 self.assertTrue(r <= 3)
 
     def test_vectorized_udf_timestamps_respect_session_timezone(self):
-        from pyspark.sql.functions import pandas_udf, col
-        from datetime import datetime
         import pandas as pd
+
         schema = StructType([
             StructField("idx", LongType(), True),
             StructField("timestamp", TimestampType(), True)])
@@ -519,8 +491,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
 
     def test_nondeterministic_vectorized_udf(self):
         # Test that nondeterministic UDFs are evaluated only once in chained UDF evaluations
-        from pyspark.sql.functions import pandas_udf, col
-
         @pandas_udf('double')
         def plus_ten(v):
             return v + 10
@@ -533,8 +503,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
         self.assertTrue(result1['plus_ten(rand)'].equals(result1['rand'] + 10))
 
     def test_nondeterministic_vectorized_udf_in_aggregate(self):
-        from pyspark.sql.functions import sum
-
         df = self.spark.range(10)
         random_udf = self.nondeterministic_vectorized_udf
 
@@ -545,8 +513,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
                 df.agg(sum(random_udf(df.id))).collect()
 
     def test_register_vectorized_udf_basic(self):
-        from pyspark.rdd import PythonEvalType
-        from pyspark.sql.functions import pandas_udf, col, expr
         df = self.spark.range(10).select(
             col('id').cast('int').alias('a'),
             col('id').cast('int').alias('b'))
@@ -563,11 +529,10 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
 
     # Regression test for SPARK-23314
     def test_timestamp_dst(self):
-        from pyspark.sql.functions import pandas_udf
         # Daylight saving time for Los Angeles for 2015 is Sun, Nov 1 at 2:00 am
-        dt = [datetime.datetime(2015, 11, 1, 0, 30),
-              datetime.datetime(2015, 11, 1, 1, 30),
-              datetime.datetime(2015, 11, 1, 2, 30)]
+        dt = [datetime(2015, 11, 1, 0, 30),
+              datetime(2015, 11, 1, 1, 30),
+              datetime(2015, 11, 1, 2, 30)]
         df = self.spark.createDataFrame(dt, 'timestamp').toDF('time')
         foo_udf = pandas_udf(lambda x: x, 'timestamp')
         result = df.withColumn('time', foo_udf(df.time))
@@ -593,7 +558,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
 
     def test_mixed_udf(self):
         import pandas as pd
-        from pyspark.sql.functions import col, udf, pandas_udf
 
         df = self.spark.range(0, 1).toDF('v')
 
@@ -696,8 +660,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
 
     def test_mixed_udf_and_sql(self):
         import pandas as pd
-        from pyspark.sql import Column
-        from pyspark.sql.functions import udf, pandas_udf
 
         df = self.spark.range(0, 1).toDF('v')
 
@@ -758,7 +720,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
         # This needs to a separate test because Arrow dependency is optional
         import pandas as pd
         import numpy as np
-        from pyspark.sql.functions import pandas_udf, lit, col
 
         path = tempfile.mkdtemp()
         shutil.rmtree(path)
diff --git a/python/pyspark/sql/tests/test_pandas_udf_window.py b/python/pyspark/sql/tests/test_pandas_udf_window.py
index f0e6d26..0a7a19c 100644
--- a/python/pyspark/sql/tests/test_pandas_udf_window.py
+++ b/python/pyspark/sql/tests/test_pandas_udf_window.py
@@ -18,6 +18,8 @@
 import unittest
 
 from pyspark.sql.utils import AnalysisException
+from pyspark.sql.functions import array, explode, col, lit, mean, min, max, rank, \
+    udf, pandas_udf, PandasUDFType
 from pyspark.sql.window import Window
 from pyspark.testing.sqlutils import ReusedSQLTestCase, have_pandas, have_pyarrow, \
     pandas_requirement_message, pyarrow_requirement_message
@@ -30,7 +32,6 @@ from pyspark.testing.utils import QuietTest
 class WindowPandasUDFTests(ReusedSQLTestCase):
     @property
     def data(self):
-        from pyspark.sql.functions import array, explode, col, lit
         return self.spark.range(10).toDF('id') \
             .withColumn("vs", array([lit(i * 1.0) + col('id') for i in range(20, 30)])) \
             .withColumn("v", explode(col('vs'))) \
@@ -39,18 +40,14 @@ class WindowPandasUDFTests(ReusedSQLTestCase):
 
     @property
     def python_plus_one(self):
-        from pyspark.sql.functions import udf
         return udf(lambda v: v + 1, 'double')
 
     @property
     def pandas_scalar_time_two(self):
-        from pyspark.sql.functions import pandas_udf
         return pandas_udf(lambda v: v * 2, 'double')
 
     @property
     def pandas_agg_mean_udf(self):
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
-
         @pandas_udf('double', PandasUDFType.GROUPED_AGG)
         def avg(v):
             return v.mean()
@@ -58,8 +55,6 @@ class WindowPandasUDFTests(ReusedSQLTestCase):
 
     @property
     def pandas_agg_max_udf(self):
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
-
         @pandas_udf('double', PandasUDFType.GROUPED_AGG)
         def max(v):
             return v.max()
@@ -67,8 +62,6 @@ class WindowPandasUDFTests(ReusedSQLTestCase):
 
     @property
     def pandas_agg_min_udf(self):
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
-
         @pandas_udf('double', PandasUDFType.GROUPED_AGG)
         def min(v):
             return v.min()
@@ -88,8 +81,6 @@ class WindowPandasUDFTests(ReusedSQLTestCase):
         return Window.partitionBy()
 
     def test_simple(self):
-        from pyspark.sql.functions import mean
-
         df = self.data
         w = self.unbounded_window
 
@@ -105,8 +96,6 @@ class WindowPandasUDFTests(ReusedSQLTestCase):
         self.assertPandasEqual(expected2.toPandas(), result2.toPandas())
 
     def test_multiple_udfs(self):
-        from pyspark.sql.functions import max, min, mean
-
         df = self.data
         w = self.unbounded_window
 
@@ -121,8 +110,6 @@ class WindowPandasUDFTests(ReusedSQLTestCase):
         self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
 
     def test_replace_existing(self):
-        from pyspark.sql.functions import mean
-
         df = self.data
         w = self.unbounded_window
 
@@ -132,8 +119,6 @@ class WindowPandasUDFTests(ReusedSQLTestCase):
         self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
 
     def test_mixed_sql(self):
-        from pyspark.sql.functions import mean
-
         df = self.data
         w = self.unbounded_window
         mean_udf = self.pandas_agg_mean_udf
@@ -144,8 +129,6 @@ class WindowPandasUDFTests(ReusedSQLTestCase):
         self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
 
     def test_mixed_udf(self):
-        from pyspark.sql.functions import mean
-
         df = self.data
         w = self.unbounded_window
 
@@ -171,8 +154,6 @@ class WindowPandasUDFTests(ReusedSQLTestCase):
         self.assertPandasEqual(expected2.toPandas(), result2.toPandas())
 
     def test_without_partitionBy(self):
-        from pyspark.sql.functions import mean
-
         df = self.data
         w = self.unpartitioned_window
         mean_udf = self.pandas_agg_mean_udf
@@ -187,8 +168,6 @@ class WindowPandasUDFTests(ReusedSQLTestCase):
         self.assertPandasEqual(expected2.toPandas(), result2.toPandas())
 
     def test_mixed_sql_and_udf(self):
-        from pyspark.sql.functions import max, min, rank, col
-
         df = self.data
         w = self.unbounded_window
         ow = self.ordered_window
@@ -221,8 +200,6 @@ class WindowPandasUDFTests(ReusedSQLTestCase):
         self.assertPandasEqual(expected4.toPandas(), result4.toPandas())
 
     def test_array_type(self):
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
-
         df = self.data
         w = self.unbounded_window
 
@@ -231,8 +208,6 @@ class WindowPandasUDFTests(ReusedSQLTestCase):
         self.assertEquals(result1.first()['v2'], [1.0, 2.0])
 
     def test_invalid_args(self):
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
-
         df = self.data
         w = self.unbounded_window
         ow = self.ordered_window


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org